/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NamedTopologyTest {
    final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier();
    final Properties props = NamedTopologyTest.configProps();
    final NamedTopologyStreamsBuilder builder1 = new NamedTopologyStreamsBuilder("topology-1");
    final NamedTopologyStreamsBuilder builder2 = new NamedTopologyStreamsBuilder("topology-2");
    final NamedTopologyStreamsBuilder builder3 = new NamedTopologyStreamsBuilder("topology-3");
    KafkaStreamsNamedTopologyWrapper streams;

    @Before
    public void setup() {
        this.builder1.stream("input-1");
        this.builder2.stream("input-2");
        this.builder3.stream("input-3");
    }

    @After
    public void cleanup() {
        if (this.streams != null) {
            this.streams.close();
        }
    }

    private static Properties configProps() {
        Properties props = new Properties();
        props.put("application.id", "Named-Topology-App");
        props.put("bootstrap.servers", "localhost:2018");
        props.put("state.dir", TestUtils.tempDirectory().getPath());
        return props;
    }

    @Test
    public void shouldThrowIllegalArgumentOnIllegalName() {
        Assert.assertThrows(IllegalArgumentException.class, () -> new NamedTopologyStreamsBuilder("__not-allowed__"));
    }

    @Test
    public void shouldBuildSingleNamedTopology() {
        this.builder1.stream("stream-1").filter((k, v) -> !k.equals(v)).to("output-1");
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.builder1.buildNamedTopology(this.props), this.props, this.clientSupplier);
    }

    @Test
    public void shouldBuildMultipleIdenticalNamedTopologyWithRepartition() {
        this.builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-1");
        this.builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-2");
        this.builder3.stream("stream-3").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-3");
        this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props), this.builder3.buildNamedTopology(this.props)), this.props, this.clientSupplier);
    }

    @Test
    public void shouldReturnTopologyByName() {
        NamedTopology topology1 = this.builder1.buildNamedTopology(this.props);
        NamedTopology topology2 = this.builder2.buildNamedTopology(this.props);
        NamedTopology topology3 = this.builder3.buildNamedTopology(this.props);
        this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(topology1, topology2, topology3), this.props, this.clientSupplier);
        MatcherAssert.assertThat(this.streams.getTopologyByName("topology-1").get(), (Matcher)CoreMatchers.equalTo((Object)topology1));
        MatcherAssert.assertThat(this.streams.getTopologyByName("topology-2").get(), (Matcher)CoreMatchers.equalTo((Object)topology2));
        MatcherAssert.assertThat(this.streams.getTopologyByName("topology-3").get(), (Matcher)CoreMatchers.equalTo((Object)topology3));
    }

    @Test
    public void shouldReturnEmptyWhenLookingUpNonExistentTopologyByName() {
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.builder1.buildNamedTopology(this.props), this.props, this.clientSupplier);
        MatcherAssert.assertThat((Object)this.streams.getTopologyByName("non-existent-topology").isPresent(), (Matcher)CoreMatchers.equalTo((Object)false));
    }

    @Test
    public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() {
        this.builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store")));
        this.builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"store")));
        this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props)), this.props, this.clientSupplier);
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSameInputTopic() {
        this.builder1.stream("stream");
        this.builder2.stream("stream");
        Assert.assertThrows(TopologyException.class, () -> {
            this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props)), this.props, this.clientSupplier);
        });
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFromSameInputTopic() {
        this.builder1.table("table");
        this.builder2.table("table");
        Assert.assertThrows(TopologyException.class, () -> {
            this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props)), this.props, this.clientSupplier);
        });
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamAndTableFromSameInputTopic() {
        this.builder1.stream("input");
        this.builder2.table("input");
        Assert.assertThrows(TopologyException.class, () -> {
            this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props)), this.props, this.clientSupplier);
        });
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromOverlappingInputTopicCollection() {
        this.builder1.stream("stream");
        this.builder2.stream(Arrays.asList("unique-input", "stream"));
        Assert.assertThrows(TopologyException.class, () -> {
            this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props)), this.props, this.clientSupplier);
        });
    }

    @Test
    public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSamePattern() {
        this.builder1.stream(Pattern.compile("some-regex"));
        this.builder2.stream(Pattern.compile("some-regex"));
        Assert.assertThrows(TopologyException.class, () -> {
            this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props)), this.props, this.clientSupplier);
        });
    }

    @Test
    public void shouldDescribeWithSingleNamedTopology() {
        this.builder1.stream("input").filter((k, v) -> !k.equals(v)).to("output");
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.builder1.buildNamedTopology(this.props), this.props, this.clientSupplier);
        MatcherAssert.assertThat((Object)this.streams.getFullTopologyDescription(), (Matcher)CoreMatchers.equalTo((Object)"Topology - topology-1:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-1])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [input])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output)\n      <-- KSTREAM-FILTER-0000000002\n\n"));
    }

    @Test
    public void shouldDescribeWithMultipleNamedTopologies() {
        this.builder1.stream("stream-1").filter((k, v) -> !k.equals(v)).to("output-1");
        this.builder2.stream("stream-2").filter((k, v) -> !k.equals(v)).to("output-2");
        this.builder3.stream("stream-3").filter((k, v) -> !k.equals(v)).to("output-3");
        this.streams = new KafkaStreamsNamedTopologyWrapper(Arrays.asList(this.builder1.buildNamedTopology(this.props), this.builder2.buildNamedTopology(this.props), this.builder3.buildNamedTopology(this.props)), this.props, this.clientSupplier);
        MatcherAssert.assertThat((Object)this.streams.getFullTopologyDescription(), (Matcher)CoreMatchers.equalTo((Object)"Topology - topology-1:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-1])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-1])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output-1)\n      <-- KSTREAM-FILTER-0000000002\n\nTopology - topology-2:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-2])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-2])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output-2)\n      <-- KSTREAM-FILTER-0000000002\n\nTopology - topology-3:\n   Sub-topology: 0\n    Source: KSTREAM-SOURCE-0000000000 (topics: [input-3])\n      --> none\n\n  Sub-topology: 1\n    Source: KSTREAM-SOURCE-0000000001 (topics: [stream-3])\n      --> KSTREAM-FILTER-0000000002\n    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n      --> KSTREAM-SINK-0000000003\n      <-- KSTREAM-SOURCE-0000000001\n    Sink: KSTREAM-SINK-0000000003 (topic: output-3)\n      <-- KSTREAM-FILTER-0000000002\n\n"));
    }

    @Test
    public void shouldDescribeWithEmptyNamedTopology() {
        this.streams = new KafkaStreamsNamedTopologyWrapper(this.props, this.clientSupplier);
        MatcherAssert.assertThat((Object)this.streams.getFullTopologyDescription(), (Matcher)CoreMatchers.equalTo((Object)""));
    }
}

