package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.namedtopology.AddNamedTopologyResult;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStoreQueryParameters;
import org.apache.kafka.streams.processor.internals.namedtopology.RemoveNamedTopologyResult;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.StreamsMetadataImpl;
import org.apache.kafka.streams.utils.UniqueTopicSerdeScope;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Tag("integration")
@Timeout(600)
/* loaded from: input_file:org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.class */
public class NamedTopologyIntegrationTest {
    private static final String TOPOLOGY_1 = "topology-1";
    private static final String TOPOLOGY_2 = "topology-2";
    private static final String TOPOLOGY_3 = "topology-3";
    private static final String INPUT_STREAM_1 = "input-stream-1";
    private static final String INPUT_STREAM_2 = "input-stream-2";
    private static final String INPUT_STREAM_3 = "input-stream-3";
    private static final String OUTPUT_STREAM_1 = "output-stream-1";
    private static final String OUTPUT_STREAM_2 = "output-stream-2";
    private static final String OUTPUT_STREAM_3 = "output-stream-3";
    private static final String SUM_OUTPUT = "sum";
    private static final String COUNT_OUTPUT = "count";
    private static final String DELAYED_INPUT_STREAM_1 = "delayed-input-stream-1";
    private static final String DELAYED_INPUT_STREAM_2 = "delayed-input-stream-2";
    private static final String NEW_STREAM = "new-stream";
    private static final String EXISTING_STREAM = "existing-stream";
    private static final String SINGLE_PARTITION_INPUT_STREAM = "single-partition-input-stream";
    private static final String SINGLE_PARTITION_OUTPUT_STREAM = "single-partition-output-stream";
    private static Properties producerConfig;
    private static Properties consumerConfig;
    private String appId;
    private String changelog1;
    private String changelog2;
    private String changelog3;
    private static final String TOPIC_PREFIX = "unique_topic_prefix";
    private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
    private Properties props;
    private Properties props2;
    private KafkaStreamsNamedTopologyWrapper streams;
    private KafkaStreamsNamedTopologyWrapper streams2;
    private NamedTopologyBuilder topology1Builder;
    private NamedTopologyBuilder topology1BuilderDup;
    private NamedTopologyBuilder topology2Builder;
    private NamedTopologyBuilder topology3Builder;
    private NamedTopologyBuilder topology1Builder2;
    private NamedTopologyBuilder topology2Builder2;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> IN_MEMORY_STORE = Materialized.as(Stores.inMemoryKeyValueStore("store"));
    private static final Materialized<Object, Long, KeyValueStore<Bytes, byte[]>> ROCKSDB_STORE = Materialized.as(Stores.persistentKeyValueStore("store"));
    private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA = Arrays.asList(KeyValue.pair("A", 100L), KeyValue.pair("B", 200L), KeyValue.pair("A", 300L), KeyValue.pair("C", 400L), KeyValue.pair("C", -50L));
    private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA = Arrays.asList(KeyValue.pair("B", 1L), KeyValue.pair("A", 2L), KeyValue.pair("C", 2L));
    private static final List<KeyValue<String, Long>> SUM_OUTPUT_DATA = Arrays.asList(KeyValue.pair("B", 200L), KeyValue.pair("A", 400L), KeyValue.pair("C", 350L));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/NamedTopologyIntegrationTest$TrackingExceptionHandler.class */
    public static class TrackingExceptionHandler implements StreamsUncaughtExceptionHandler {
        private final Map<String, Queue<Throwable>> newErrorsByTopology;

        private TrackingExceptionHandler() {
            this.newErrorsByTopology = new HashMap();
        }

        public synchronized StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handle(Throwable th) {
            this.newErrorsByTopology.computeIfAbsent(((th instanceof StreamsException) && ((StreamsException) th).taskId().isPresent()) ? ((TaskId) ((StreamsException) th).taskId().get()).topologyName() : null, str -> {
                return new LinkedList();
            }).add(th);
            return th.getCause() instanceof MissingSourceTopicException ? StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD : StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
        }

        public synchronized Throwable nextError(String str) {
            if (this.newErrorsByTopology.containsKey(str)) {
                return this.newErrorsByTopology.get(str).poll();
            }
            return null;
        }
    }

    @BeforeAll
    public static void initializeClusterAndStandardTopics() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(INPUT_STREAM_3, 2, 1);
        producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class);
        consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
        produceToInputTopics(INPUT_STREAM_1, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_2, STANDARD_INPUT_DATA);
        produceToInputTopics(INPUT_STREAM_3, STANDARD_INPUT_DATA);
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    private Properties configProps(String str, String str2) {
        Properties properties = new Properties();
        properties.put("application.id", str);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory(str).getPath());
        properties.put("default.key.serde", Serdes.String().getClass());
        properties.put("default.value.serde", Serdes.Long().getClass());
        properties.put("num.stream.threads", 2);
        properties.put("application.server", str2 + ":2020");
        properties.put("commit.interval.ms", 1000L);
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", 10000);
        properties.put("__internal.override.topic.prefix__", TOPIC_PREFIX);
        return properties;
    }

    @BeforeEach
    public void setup(TestInfo testInfo) throws Exception {
        this.appId = IntegrationTestUtils.safeUniqueTestName((Class<?>) NamedTopologyIntegrationTest.class, testInfo);
        this.changelog1 = "unique_topic_prefix-topology-1-store-changelog";
        this.changelog2 = "unique_topic_prefix-topology-2-store-changelog";
        this.changelog3 = "unique_topic_prefix-topology-3-store-changelog";
        this.props = configProps(this.appId, "host1");
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        this.topology1Builder = this.streams.newNamedTopologyBuilder(TOPOLOGY_1);
        this.topology1BuilderDup = this.streams.newNamedTopologyBuilder(TOPOLOGY_1);
        this.topology2Builder = this.streams.newNamedTopologyBuilder(TOPOLOGY_2);
        this.topology3Builder = this.streams.newNamedTopologyBuilder(TOPOLOGY_3);
        CLUSTER.createTopic(OUTPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_2, 2, 1);
        CLUSTER.createTopic(OUTPUT_STREAM_3, 2, 1);
    }

    private void setupSecondKafkaStreams() {
        this.props2 = configProps(this.appId, "host2");
        this.streams2 = new KafkaStreamsNamedTopologyWrapper(this.props2, this.clientSupplier);
        this.topology1Builder2 = this.streams2.newNamedTopologyBuilder(TOPOLOGY_1);
        this.topology2Builder2 = this.streams2.newNamedTopologyBuilder(TOPOLOGY_2);
    }

    @AfterEach
    public void shutdown() throws Exception {
        if (this.streams != null) {
            this.streams.close(Duration.ofSeconds(30L));
        }
        if (this.streams2 != null) {
            this.streams2.close(Duration.ofSeconds(30L));
        }
        CLUSTER.getAllTopicsInCluster().stream().filter(str -> {
            return str.contains("-changelog") || str.contains("-repartition");
        }).forEach(str2 -> {
            try {
                MatcherAssert.assertThat("topic was not decorated", str2.contains(TOPIC_PREFIX));
                CLUSTER.deleteTopicsAndWait(str2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        CLUSTER.deleteTopicsAndWait(OUTPUT_STREAM_1, OUTPUT_STREAM_2, OUTPUT_STREAM_3);
        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
    }

    @Test
    public void shouldPrefixAllInternalTopicNamesWithNamedTopology() throws Exception {
        NamedTopologyBuilder newNamedTopologyBuilder = this.streams.newNamedTopologyBuilder("count-topology");
        newNamedTopologyBuilder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count();
        NamedTopologyBuilder newNamedTopologyBuilder2 = this.streams.newNamedTopologyBuilder("FKJ-topology");
        UniqueTopicSerdeScope uniqueTopicSerdeScope = new UniqueTopicSerdeScope();
        newNamedTopologyBuilder2.table(INPUT_STREAM_2, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), this.props, true), uniqueTopicSerdeScope.decorateSerde(Serdes.Long(), this.props, false))).join(newNamedTopologyBuilder2.table(INPUT_STREAM_3, Consumed.with(uniqueTopicSerdeScope.decorateSerde(Serdes.String(), this.props, true), uniqueTopicSerdeScope.decorateSerde(Serdes.Long(), this.props, false))), (v0) -> {
            return v0.toString();
        }, (l, l2) -> {
            return String.valueOf(l.longValue() + l2.longValue());
        }, Materialized.with((Serde) null, uniqueTopicSerdeScope.decorateSerde(Serdes.String(), this.props, false)));
        this.streams.addNamedTopology(newNamedTopologyBuilder2.build());
        this.streams.addNamedTopology(newNamedTopologyBuilder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        MatcherAssert.assertThat((Set) CLUSTER.getAllTopicsInCluster().stream().filter(str -> {
            return str.contains(TOPIC_PREFIX);
        }).filter(str2 -> {
            return str2.endsWith("-repartition") || str2.endsWith("-changelog") || str2.endsWith("-topic");
        }).collect(Collectors.toSet()), CoreMatchers.is(Utils.mkSet(new String[]{"unique_topic_prefix-count-topology-KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition", "unique_topic_prefix-count-topology-KSTREAM-AGGREGATE-STATE-STORE-0000000002-changelog", "unique_topic_prefix-FKJ-topology-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000006-topic", "unique_topic_prefix-FKJ-topology-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000014-topic", "unique_topic_prefix-FKJ-topology-KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-0000000010-changelog", "unique_topic_prefix-FKJ-topology-input-stream-2-STATE-STORE-0000000000-changelog", "unique_topic_prefix-FKJ-topology-input-stream-3-STATE-STORE-0000000003-changelog"})));
    }

    @Test
    public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).selectKey((obj, obj2) -> {
            return obj;
        }).groupByKey().count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams.addNamedTopology(this.topology1Builder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        Set<String> allTopicsInCluster = CLUSTER.getAllTopicsInCluster();
        MatcherAssert.assertThat(Boolean.valueOf(allTopicsInCluster.contains("unique_topic_prefix-topology-1-store-changelog")), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(allTopicsInCluster.contains("unique_topic_prefix-topology-1-store-repartition")), CoreMatchers.is(true));
    }

    @Test
    public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersistentStateStores() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams.addNamedTopology(this.topology2Builder.build());
        this.streams.addNamedTopology(this.topology3Builder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(Boolean.valueOf(CLUSTER.getAllTopicsInCluster().containsAll(Arrays.asList(this.changelog1, this.changelog2, this.changelog3))), CoreMatchers.is(true));
    }

    @Test
    public void shouldAddAndRemoveNamedTopologiesBeforeStartingAndRouteQueriesToCorrectTopology() throws Exception {
        try {
            CLUSTER.createTopic(SINGLE_PARTITION_INPUT_STREAM, 1, 1);
            CLUSTER.createTopic(SINGLE_PARTITION_OUTPUT_STREAM, 1, 1);
            produceToInputTopics(SINGLE_PARTITION_INPUT_STREAM, STANDARD_INPUT_DATA);
            this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
                return obj;
            }).count(Materialized.as("store-topology-1")).toStream().to(OUTPUT_STREAM_1);
            this.topology2Builder.stream(SINGLE_PARTITION_INPUT_STREAM).groupByKey().count(Materialized.as("store-topology-2")).toStream().to(SINGLE_PARTITION_OUTPUT_STREAM);
            this.streams.addNamedTopology(this.topology1Builder.build());
            this.streams.removeNamedTopology(TOPOLOGY_1);
            MatcherAssert.assertThat(this.streams.getTopologyByName(TOPOLOGY_1), CoreMatchers.is(Optional.empty()));
            this.streams.addNamedTopology(this.topology1Builder.build());
            this.streams.addNamedTopology(this.topology2Builder.build());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SINGLE_PARTITION_OUTPUT_STREAM, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(((ReadOnlyKeyValueStore) this.streams.store(NamedTopologyStoreQueryParameters.fromNamedTopologyAndStoreNameAndType(TOPOLOGY_1, "store-topology-1", QueryableStoreTypes.keyValueStore()))).get("A"), CoreMatchers.equalTo(2L));
            Collection streamsMetadataForStore = this.streams.streamsMetadataForStore("store-topology-1", TOPOLOGY_1);
            Collection streamsMetadataForStore2 = this.streams.streamsMetadataForStore("store-topology-2", TOPOLOGY_2);
            MatcherAssert.assertThat(Integer.valueOf(streamsMetadataForStore.size()), CoreMatchers.equalTo(1));
            MatcherAssert.assertThat(Integer.valueOf(streamsMetadataForStore2.size()), CoreMatchers.equalTo(1));
            KeyQueryMetadata queryMetadataForKey = this.streams.queryMetadataForKey("store-topology-1", "A", new StringSerializer(), TOPOLOGY_1);
            KeyQueryMetadata queryMetadataForKey2 = this.streams.queryMetadataForKey("store-topology-2", "A", new StringSerializer(), TOPOLOGY_2);
            MatcherAssert.assertThat(queryMetadataForKey, CoreMatchers.not(KeyQueryMetadata.NOT_AVAILABLE));
            MatcherAssert.assertThat(queryMetadataForKey, CoreMatchers.equalTo(queryMetadataForKey2));
            Map allLocalStorePartitionLagsForTopology = this.streams.allLocalStorePartitionLagsForTopology(TOPOLOGY_1);
            Map allLocalStorePartitionLagsForTopology2 = this.streams.allLocalStorePartitionLagsForTopology(TOPOLOGY_2);
            MatcherAssert.assertThat(allLocalStorePartitionLagsForTopology.keySet(), CoreMatchers.equalTo(Collections.singleton("store-topology-1")));
            MatcherAssert.assertThat(((Map) allLocalStorePartitionLagsForTopology.get("store-topology-1")).keySet(), CoreMatchers.equalTo(Utils.mkSet(new Integer[]{0, 1})));
            MatcherAssert.assertThat(allLocalStorePartitionLagsForTopology2.keySet(), CoreMatchers.equalTo(Collections.singleton("store-topology-2")));
            MatcherAssert.assertThat(((Map) allLocalStorePartitionLagsForTopology2.get("store-topology-2")).keySet(), CoreMatchers.equalTo(Collections.singleton(0)));
            setupSecondKafkaStreams();
            this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((obj3, obj4) -> {
                return obj3;
            }).count(Materialized.as("store-topology-1")).toStream().to(OUTPUT_STREAM_1);
            this.topology2Builder2.stream(SINGLE_PARTITION_INPUT_STREAM).groupByKey().count(Materialized.as("store-topology-2")).toStream().to(SINGLE_PARTITION_OUTPUT_STREAM);
            this.streams2.start(Arrays.asList(this.topology1Builder2.build(), this.topology2Builder2.build()));
            IntegrationTestUtils.waitForApplicationState(Arrays.asList(this.streams, this.streams2), KafkaStreams.State.RUNNING, Duration.ofSeconds(60L));
            verifyMetadataForTopology(TOPOLOGY_1, this.streams.streamsMetadataForStore("store-topology-1", TOPOLOGY_1), this.streams2.streamsMetadataForStore("store-topology-1", TOPOLOGY_1));
            verifyMetadataForTopology(TOPOLOGY_2, this.streams.streamsMetadataForStore("store-topology-2", TOPOLOGY_2), this.streams2.streamsMetadataForStore("store-topology-2", TOPOLOGY_2));
            MatcherAssert.assertThat(Integer.valueOf(this.streams.allStreamsClientsMetadataForTopology(TOPOLOGY_1).size()), CoreMatchers.equalTo(2));
            MatcherAssert.assertThat(Integer.valueOf(this.streams2.allStreamsClientsMetadataForTopology(TOPOLOGY_1).size()), CoreMatchers.equalTo(2));
            verifyMetadataForTopology(TOPOLOGY_1, this.streams.allStreamsClientsMetadataForTopology(TOPOLOGY_1), this.streams2.allStreamsClientsMetadataForTopology(TOPOLOGY_1));
            MatcherAssert.assertThat(Integer.valueOf(this.streams.allStreamsClientsMetadataForTopology(TOPOLOGY_2).size()), CoreMatchers.equalTo(2));
            MatcherAssert.assertThat(Integer.valueOf(this.streams2.allStreamsClientsMetadataForTopology(TOPOLOGY_2).size()), CoreMatchers.equalTo(2));
            verifyMetadataForTopology(TOPOLOGY_2, this.streams.allStreamsClientsMetadataForTopology(TOPOLOGY_2), this.streams2.allStreamsClientsMetadataForTopology(TOPOLOGY_2));
            CLUSTER.deleteTopics(SINGLE_PARTITION_INPUT_STREAM, SINGLE_PARTITION_OUTPUT_STREAM);
        } catch (Throwable th) {
            CLUSTER.deleteTopics(SINGLE_PARTITION_INPUT_STREAM, SINGLE_PARTITION_OUTPUT_STREAM);
            throw th;
        }
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams.start();
        this.streams.addNamedTopology(this.topology1Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopology() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams.addNamedTopology(this.topology1Builder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        this.streams.addNamedTopology(this.topology2Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(INPUT_STREAM_3).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams.addNamedTopology(this.topology2Builder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        this.streams.addNamedTopology(this.topology3Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() throws Exception {
        setupSecondKafkaStreams();
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(INPUT_STREAM_2).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology2Builder2.stream(INPUT_STREAM_2).groupBy((obj7, obj8) -> {
            return obj7;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams2.addNamedTopology(this.topology1Builder2.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((List<KafkaStreams>) Arrays.asList(this.streams, this.streams2));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        AddNamedTopologyResult addNamedTopology = this.streams.addNamedTopology(this.topology2Builder.build());
        AddNamedTopologyResult addNamedTopology2 = this.streams2.addNamedTopology(this.topology2Builder2.build());
        addNamedTopology.all().get();
        addNamedTopology2.all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets() throws Exception {
        setupSecondKafkaStreams();
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(INPUT_STREAM_1).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams2.addNamedTopology(this.topology1Builder2.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((List<KafkaStreams>) Arrays.asList(this.streams, this.streams2));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        RemoveNamedTopologyResult removeNamedTopology = this.streams.removeNamedTopology(TOPOLOGY_1, true);
        this.streams2.removeNamedTopology(TOPOLOGY_1, true).all().get();
        removeNamedTopology.all().get();
        MatcherAssert.assertThat(this.streams.getTopologyByName(TOPOLOGY_1), CoreMatchers.equalTo(Optional.empty()));
        MatcherAssert.assertThat(this.streams2.getTopologyByName(TOPOLOGY_1), CoreMatchers.equalTo(Optional.empty()));
        MatcherAssert.assertThat(Boolean.valueOf(this.streams.getAllTopologies().isEmpty()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.streams2.getAllTopologies().isEmpty()), CoreMatchers.is(true));
        this.streams.cleanUpNamedTopology(TOPOLOGY_1);
        this.streams2.cleanUpNamedTopology(TOPOLOGY_1);
        CLUSTER.getAllTopicsInCluster().stream().filter(str -> {
            return str.contains("-changelog");
        }).forEach(str2 -> {
            try {
                CLUSTER.deleteTopicAndWait(str2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        this.topology2Builder.stream(INPUT_STREAM_1).groupBy((obj5, obj6) -> {
            return obj5;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        this.topology2Builder2.stream(INPUT_STREAM_1).groupBy((obj7, obj8) -> {
            return obj7;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
        NamedTopology build = this.topology2Builder.build();
        NamedTopology build2 = this.topology2Builder2.build();
        AddNamedTopologyResult addNamedTopology = this.streams.addNamedTopology(build);
        this.streams2.addNamedTopology(build2).all().get();
        addNamedTopology.all().get();
        MatcherAssert.assertThat(this.streams.getAllTopologies(), CoreMatchers.equalTo(Collections.singleton(build)));
        MatcherAssert.assertThat(this.streams2.getAllTopologies(), CoreMatchers.equalTo(Collections.singleton(build2)));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws Exception {
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
        try {
            this.topology1Builder.stream(DELAYED_INPUT_STREAM_1).groupBy((obj, obj2) -> {
                return obj;
            }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
            this.topology2Builder.stream(DELAYED_INPUT_STREAM_2).map((obj3, obj4) -> {
                throw new IllegalStateException("Should not process any records for removed topology-2");
            });
            this.streams.addNamedTopology(this.topology1Builder.build());
            this.streams.addNamedTopology(this.topology2Builder.build());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
            this.streams.removeNamedTopology(TOPOLOGY_2).all().get();
            produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
            produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
        } catch (Throwable th) {
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
            throw th;
        }
    }

    @Test
    public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        try {
            KStream stream = this.topology1Builder.stream(INPUT_STREAM_1);
            stream.groupByKey().count().toStream().to(COUNT_OUTPUT);
            stream.groupByKey().reduce((v0, v1) -> {
                return Long.sum(v0, v1);
            }).toStream().to(SUM_OUTPUT);
            this.streams.addNamedTopology(this.topology1Builder.build());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
            this.streams.removeNamedTopology(TOPOLOGY_1).all().get();
            this.streams.cleanUpNamedTopology(TOPOLOGY_1);
            NamedTopologyBuilder newNamedTopologyBuilder = this.streams.newNamedTopologyBuilder(TOPOLOGY_1);
            KStream stream2 = newNamedTopologyBuilder.stream(DELAYED_INPUT_STREAM_1);
            stream2.groupByKey().reduce((v0, v1) -> {
                return Long.sum(v0, v1);
            }).toStream().to(SUM_OUTPUT);
            stream2.groupByKey().count().toStream().to(COUNT_OUTPUT);
            produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA);
            this.streams.addNamedTopology(newNamedTopologyBuilder.build()).all().get();
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
        } catch (Throwable th) {
            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1);
            throw th;
        }
    }

    @Test
    public void shouldAllowPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(Pattern.compile(INPUT_STREAM_1)).groupBy((obj, obj2) -> {
            return obj;
        }).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((obj3, obj4) -> {
            return obj3;
        }).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((obj5, obj6) -> {
            return obj5;
        }).count().toStream().to(OUTPUT_STREAM_3);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams.addNamedTopology(this.topology2Builder.build());
        this.streams.addNamedTopology(this.topology3Builder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTopologies() throws Exception {
        this.topology1Builder.stream(INPUT_STREAM_1).groupBy((obj, obj2) -> {
            return obj;
        }).count().toStream().to(OUTPUT_STREAM_1);
        this.topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((obj3, obj4) -> {
            return obj3;
        }).count().toStream().to(OUTPUT_STREAM_2);
        this.topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((obj5, obj6) -> {
            return obj5;
        }).count().toStream().to(OUTPUT_STREAM_3);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams.addNamedTopology(this.topology2Builder.build());
        this.streams.addNamedTopology(this.topology3Builder.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
    }

    @Test
    public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        try {
            KStream stream = this.topology1Builder.stream(INPUT_STREAM_1);
            stream.groupByKey().count().toStream().to(COUNT_OUTPUT);
            stream.groupByKey().reduce((v0, v1) -> {
                return Long.sum(v0, v1);
            }).toStream().to(SUM_OUTPUT);
            this.streams.start();
            this.streams.addNamedTopology(this.topology1Builder.build()).all().get();
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
            this.streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
            this.streams.cleanUpNamedTopology(TOPOLOGY_1);
            CLUSTER.getAllTopicsInCluster().stream().filter(str -> {
                return str.contains("changelog");
            }).forEach(str2 -> {
                try {
                    CLUSTER.deleteTopicAndWait(str2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            KStream stream2 = this.topology1BuilderDup.stream(INPUT_STREAM_1);
            stream2.groupByKey().count().toStream().to(COUNT_OUTPUT);
            stream2.groupByKey().reduce((v0, v1) -> {
                return Long.sum(v0, v1);
            }).toStream().to(SUM_OUTPUT);
            this.streams.addNamedTopology(this.topology1BuilderDup.build()).all().get();
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
        } catch (Throwable th) {
            CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
            throw th;
        }
    }

    @Test
    public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning() throws Exception {
        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
        KStream stream = this.topology1Builder.stream(INPUT_STREAM_1);
        stream.map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().count().toStream().to(COUNT_OUTPUT);
        stream.map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).toStream().to(SUM_OUTPUT);
        this.streams.start();
        this.streams.addNamedTopology(this.topology1Builder.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        this.streams.removeNamedTopology(TOPOLOGY_1, true).all().get();
        this.streams.cleanUpNamedTopology(TOPOLOGY_1);
        CLUSTER.getAllTopicsInCluster().stream().filter(str -> {
            return str.contains("-changelog") || str.contains("-repartition");
        }).forEach(str2 -> {
            try {
                CLUSTER.deleteTopicsAndWait(str2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        KStream stream2 = this.topology1BuilderDup.stream(INPUT_STREAM_1);
        stream2.map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().count().toStream().to(COUNT_OUTPUT);
        stream2.map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }).groupByKey().reduce((v0, v1) -> {
            return Long.sum(v0, v1);
        }).toStream().to(SUM_OUTPUT);
        this.streams.addNamedTopology(this.topology1BuilderDup.build()).all().get();
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
        MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), CoreMatchers.equalTo(SUM_OUTPUT_DATA));
        CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
    }

    @Test
    public void shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() throws Exception {
        try {
            CLUSTER.createTopic(EXISTING_STREAM, 2, 1);
            produceToInputTopics(EXISTING_STREAM, STANDARD_INPUT_DATA);
            setupSecondKafkaStreams();
            this.topology1Builder.stream(EXISTING_STREAM).groupBy((obj, obj2) -> {
                return obj;
            }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
            this.topology1Builder2.stream(EXISTING_STREAM).groupBy((obj3, obj4) -> {
                return obj3;
            }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
            TrackingExceptionHandler trackingExceptionHandler = new TrackingExceptionHandler();
            this.streams.setUncaughtExceptionHandler(trackingExceptionHandler);
            this.streams2.setUncaughtExceptionHandler(trackingExceptionHandler);
            this.streams.addNamedTopology(this.topology1Builder.build());
            this.streams2.addNamedTopology(this.topology1Builder2.build());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning((List<KafkaStreams>) Arrays.asList(this.streams, this.streams2));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            this.topology2Builder.stream(NEW_STREAM).groupBy((obj5, obj6) -> {
                return obj5;
            }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
            this.topology2Builder2.stream(NEW_STREAM).groupBy((obj7, obj8) -> {
                return obj7;
            }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
            MatcherAssert.assertThat(trackingExceptionHandler.nextError(TOPOLOGY_2), CoreMatchers.nullValue());
            this.streams.addNamedTopology(this.topology2Builder.build());
            this.streams2.addNamedTopology(this.topology2Builder2.build());
            TestUtils.retryOnExceptionWithTimeout(() -> {
                Throwable nextError = trackingExceptionHandler.nextError(TOPOLOGY_2);
                MatcherAssert.assertThat(nextError, CoreMatchers.notNullValue());
                MatcherAssert.assertThat(nextError.getCause().getClass(), CoreMatchers.is(MissingSourceTopicException.class));
            });
            produceToInputTopics(EXISTING_STREAM, Collections.singletonList(KeyValue.pair("A", 30L)));
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 1), CoreMatchers.equalTo(Collections.singletonList(KeyValue.pair("A", 3L))));
            CLUSTER.createTopic(NEW_STREAM, 2, 1);
            produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
            MatcherAssert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(Integer.valueOf(this.streams.metadataForLocalThreads().size()), CoreMatchers.equalTo(2));
            MatcherAssert.assertThat(Integer.valueOf(this.streams2.metadataForLocalThreads().size()), CoreMatchers.equalTo(2));
            Set set = (Set) this.streams.metadataForLocalThreads().stream().map(threadMetadata -> {
                return ClientUtils.extractThreadId(threadMetadata.threadName());
            }).collect(Collectors.toSet());
            Set set2 = (Set) this.streams2.metadataForLocalThreads().stream().map(threadMetadata2 -> {
                return ClientUtils.extractThreadId(threadMetadata2.threadName());
            }).collect(Collectors.toSet());
            MatcherAssert.assertThat(Boolean.valueOf(set.contains("StreamThread-1")), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(set.contains("StreamThread-2")), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(set2.contains("StreamThread-1")), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(set2.contains("StreamThread-2")), CoreMatchers.is(true));
            CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
        } catch (Throwable th) {
            CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
            throw th;
        }
    }

    @Test
    public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
        setupSecondKafkaStreams();
        this.topology1Builder.stream(NEW_STREAM).groupBy((obj, obj2) -> {
            return obj;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        this.topology1Builder2.stream(NEW_STREAM).groupBy((obj3, obj4) -> {
            return obj3;
        }).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
        TrackingExceptionHandler trackingExceptionHandler = new TrackingExceptionHandler();
        this.streams.setUncaughtExceptionHandler(trackingExceptionHandler);
        this.streams2.setUncaughtExceptionHandler(trackingExceptionHandler);
        this.streams.addNamedTopology(this.topology1Builder.build());
        this.streams2.addNamedTopology(this.topology1Builder2.build());
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((List<KafkaStreams>) Arrays.asList(this.streams, this.streams2));
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Throwable nextError = trackingExceptionHandler.nextError(TOPOLOGY_1);
            MatcherAssert.assertThat(nextError, CoreMatchers.notNullValue());
            MatcherAssert.assertThat(nextError.getCause().getClass(), CoreMatchers.is(MissingSourceTopicException.class));
        });
        try {
            CLUSTER.createTopic(NEW_STREAM, 2, 1);
            produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
            List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3);
            waitUntilMinKeyValueRecordsReceived.retainAll(COUNT_OUTPUT_DATA);
            MatcherAssert.assertThat(waitUntilMinKeyValueRecordsReceived, CoreMatchers.equalTo(COUNT_OUTPUT_DATA));
            MatcherAssert.assertThat(Integer.valueOf(this.streams.metadataForLocalThreads().size()), CoreMatchers.equalTo(2));
            MatcherAssert.assertThat(Integer.valueOf(this.streams2.metadataForLocalThreads().size()), CoreMatchers.equalTo(2));
            Set set = (Set) this.streams.metadataForLocalThreads().stream().map(threadMetadata -> {
                return ClientUtils.extractThreadId(threadMetadata.threadName());
            }).collect(Collectors.toSet());
            Set set2 = (Set) this.streams2.metadataForLocalThreads().stream().map(threadMetadata2 -> {
                return ClientUtils.extractThreadId(threadMetadata2.threadName());
            }).collect(Collectors.toSet());
            MatcherAssert.assertThat(Boolean.valueOf(set.contains("StreamThread-1")), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(set.contains("StreamThread-2")), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(set2.contains("StreamThread-1")), CoreMatchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(set2.contains("StreamThread-2")), CoreMatchers.is(true));
            CLUSTER.deleteTopicsAndWait(NEW_STREAM);
        } catch (Throwable th) {
            CLUSTER.deleteTopicsAndWait(NEW_STREAM);
            throw th;
        }
    }

    @Test
    public void shouldBackOffTaskAndEmitDataWithinSameTopology() throws Exception {
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_1, 2, 1);
        CLUSTER.createTopic(DELAYED_INPUT_STREAM_2, 2, 1);
        try {
            AtomicInteger atomicInteger = new AtomicInteger(0);
            AtomicInteger atomicInteger2 = new AtomicInteger(0);
            this.props.put("statestore.cache.max.bytes", 0);
            this.props.put("commit.interval.ms", 15000L);
            this.props.put("state.dir", TestUtils.tempDirectory(this.appId).getPath());
            this.props.put("default.key.serde", Serdes.IntegerSerde.class);
            this.props.put("default.value.serde", Serdes.StringSerde.class);
            this.streams = new KafkaStreamsNamedTopologyWrapper(this.props);
            this.streams.setUncaughtExceptionHandler(th -> {
                return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
            });
            NamedTopologyBuilder newNamedTopologyBuilder = this.streams.newNamedTopologyBuilder("topology_A");
            newNamedTopologyBuilder.stream(DELAYED_INPUT_STREAM_1).peek((obj, obj2) -> {
                atomicInteger2.incrementAndGet();
            }).to(OUTPUT_STREAM_1);
            newNamedTopologyBuilder.stream(DELAYED_INPUT_STREAM_2).peek((obj3, obj4) -> {
                throw new RuntimeException("Kaboom");
            }).peek((obj5, obj6) -> {
                atomicInteger.incrementAndGet();
            }).to(OUTPUT_STREAM_2);
            this.streams.addNamedTopology(newNamedTopologyBuilder.build());
            IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams) this.streams);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(DELAYED_INPUT_STREAM_2, Collections.singletonList(new KeyValue(1, "A")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), 0L);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(DELAYED_INPUT_STREAM_1, Arrays.asList(new KeyValue(1, "A"), new KeyValue(1, "B")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), 0L);
            IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, StringDeserializer.class), OUTPUT_STREAM_1, Arrays.asList(new KeyValue(1, "A"), new KeyValue(1, "B")));
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.equalTo(0));
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger2.get()), CoreMatchers.equalTo(2));
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
        } catch (Throwable th2) {
            CLUSTER.deleteTopics(DELAYED_INPUT_STREAM_1, DELAYED_INPUT_STREAM_2);
            throw th2;
        }
    }

    private void verifyMetadataForTopology(String str, Collection<StreamsMetadata> collection, Collection<StreamsMetadata> collection2) {
        MatcherAssert.assertThat(Integer.valueOf(collection.size()), CoreMatchers.equalTo(Integer.valueOf(collection2.size())));
        Iterator<StreamsMetadata> it = collection.iterator();
        Iterator<StreamsMetadata> it2 = collection2.iterator();
        while (it.hasNext()) {
            StreamsMetadataImpl streamsMetadataImpl = (StreamsMetadataImpl) it.next();
            StreamsMetadataImpl streamsMetadataImpl2 = (StreamsMetadataImpl) it2.next();
            verifyPartitionsAndStoresForTopology(str, streamsMetadataImpl);
            verifyPartitionsAndStoresForTopology(str, streamsMetadataImpl2);
            MatcherAssert.assertThat(Boolean.valueOf(verifyEquivalentMetadataForHost(streamsMetadataImpl, streamsMetadataImpl2)), CoreMatchers.is(true));
        }
    }

    private void verifyPartitionsAndStoresForTopology(String str, StreamsMetadataImpl streamsMetadataImpl) {
        MatcherAssert.assertThat(streamsMetadataImpl.topologyName(), CoreMatchers.equalTo(str));
        MatcherAssert.assertThat(Boolean.valueOf(this.streams.getTopologyByName(str).isPresent()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(this.streams2.getTopologyByName(str).isPresent()), CoreMatchers.is(true));
        List sourceTopics = ((NamedTopology) this.streams.getTopologyByName(str).get()).sourceTopics();
        List sourceTopics2 = ((NamedTopology) this.streams2.getTopologyByName(str).get()).sourceTopics();
        MatcherAssert.assertThat(Boolean.valueOf(sourceTopics.containsAll((Collection) streamsMetadataImpl.topicPartitions().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toList()))), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(sourceTopics2.containsAll((Collection) streamsMetadataImpl.topicPartitions().stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toList()))), CoreMatchers.is(true));
        if (streamsMetadataImpl.topicPartitions().isEmpty()) {
            MatcherAssert.assertThat(Boolean.valueOf(streamsMetadataImpl.stateStoreNames().isEmpty()), CoreMatchers.is(true));
        } else {
            MatcherAssert.assertThat(streamsMetadataImpl.stateStoreNames(), CoreMatchers.equalTo(Collections.singleton("store-" + str)));
        }
        MatcherAssert.assertThat(Boolean.valueOf(streamsMetadataImpl.standbyTopicPartitions().isEmpty()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(streamsMetadataImpl.standbyStateStoreNames().isEmpty()), CoreMatchers.is(true));
    }

    private static boolean verifyEquivalentMetadataForHost(StreamsMetadataImpl streamsMetadataImpl, StreamsMetadataImpl streamsMetadataImpl2) {
        return streamsMetadataImpl.hostInfo().equals(streamsMetadataImpl2.hostInfo()) && streamsMetadataImpl.stateStoreNames().equals(streamsMetadataImpl2.stateStoreNames()) && streamsMetadataImpl.topicPartitions().equals(streamsMetadataImpl2.topicPartitions()) && streamsMetadataImpl.standbyStateStoreNames().equals(streamsMetadataImpl2.standbyStateStoreNames()) && streamsMetadataImpl.standbyTopicPartitions().equals(streamsMetadataImpl2.standbyTopicPartitions());
    }

    private static void produceToInputTopics(String str, Collection<KeyValue<String, Long>> collection) {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, collection, producerConfig, CLUSTER.time);
    }
}
