package org.apache.kafka.streams;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest.class */
public class KafkaStreamsTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private final KStreamBuilder builder = new KStreamBuilder();
    private KafkaStreams streams;
    private Properties props;

    /* loaded from: input_file:org/apache/kafka/streams/KafkaStreamsTest$StateListenerStub.class */
    public static class StateListenerStub implements KafkaStreams.StateListener {
        public KafkaStreams.State oldState;
        public KafkaStreams.State newState;
        public int numChanges = 0;
        public Map<KafkaStreams.State, Long> mapStates = new HashMap();

        public void onChange(KafkaStreams.State state, KafkaStreams.State state2) {
            long longValue = this.mapStates.containsKey(state) ? this.mapStates.get(state).longValue() : 0L;
            this.numChanges += KafkaStreamsTest.NUM_BROKERS;
            this.oldState = state2;
            this.newState = state;
            this.mapStates.put(state, Long.valueOf(longValue + 1));
        }
    }

    @Before
    public void before() {
        this.props = new Properties();
        this.props.setProperty("application.id", "appId");
        this.props.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        this.props.setProperty("metric.reporters", MockMetricsReporter.class.getName());
        this.props.setProperty("state.dir", TestUtils.tempDirectory().getPath());
        this.streams = new KafkaStreams(this.builder, this.props);
    }

    @Test
    public void testInitializesAndDestroysMetricsReporters() throws Exception {
        int i = MockMetricsReporter.INIT_COUNT.get();
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), this.props);
        Assert.assertTrue("some reporters should be initialized by calling on construction", MockMetricsReporter.INIT_COUNT.get() - i > 0);
        StateListenerStub stateListenerStub = new StateListenerStub();
        kafkaStreams.setStateListener(stateListenerStub);
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.CREATED);
        Assert.assertEquals(stateListenerStub.numChanges, 0L);
        kafkaStreams.start();
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.RUNNING);
        Assert.assertEquals(stateListenerStub.numChanges, 1L);
        Assert.assertEquals(stateListenerStub.oldState, KafkaStreams.State.CREATED);
        Assert.assertEquals(stateListenerStub.newState, KafkaStreams.State.RUNNING);
        Assert.assertEquals(stateListenerStub.mapStates.get(KafkaStreams.State.RUNNING).longValue(), 1L);
        int i2 = MockMetricsReporter.CLOSE_COUNT.get();
        kafkaStreams.close();
        Assert.assertEquals(i2 + r0, MockMetricsReporter.CLOSE_COUNT.get());
        Assert.assertEquals(kafkaStreams.state(), KafkaStreams.State.NOT_RUNNING);
        Assert.assertEquals(stateListenerStub.mapStates.get(KafkaStreams.State.RUNNING).longValue(), 1L);
        Assert.assertEquals(stateListenerStub.mapStates.get(KafkaStreams.State.NOT_RUNNING).longValue(), 1L);
    }

    @Test
    public void testCloseIsIdempotent() throws Exception {
        this.streams.close();
        int i = MockMetricsReporter.CLOSE_COUNT.get();
        this.streams.close();
        Assert.assertEquals("subsequent close() calls should do nothing", i, MockMetricsReporter.CLOSE_COUNT.get());
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotStartOnceClosed() throws Exception {
        this.streams.start();
        this.streams.close();
        try {
            try {
                this.streams.start();
                this.streams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot start again.", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotStartTwice() throws Exception {
        this.streams.start();
        try {
            try {
                this.streams.start();
                this.streams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot start again.", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            this.streams.close();
            throw th;
        }
    }

    @Test
    public void testNumberDefaultMetrics() {
        Assert.assertEquals(createKafkaStreams().metrics().size(), 16L);
    }

    @Test(expected = ConfigException.class)
    public void testIllegalMetricsConfig() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "appId");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("metrics.recording.level", "illegalConfig");
        new KafkaStreams(new KStreamBuilder(), properties);
    }

    @Test
    public void testLegalMetricsConfig() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "appId");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("metrics.recording.level", Sensor.RecordingLevel.INFO.toString());
        new KafkaStreams(new KStreamBuilder(), properties).close();
        properties.setProperty("metrics.recording.level", Sensor.RecordingLevel.DEBUG.toString());
        new KafkaStreams(new KStreamBuilder(), properties);
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
        this.streams.allMetadata();
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
        this.streams.allMetadataForStore("store");
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
        this.streams.metadataForKey("store", "key", Serdes.String().serializer());
    }

    @Test(expected = IllegalStateException.class)
    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
        this.streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.1
            public Integer partition(String str, Object obj, int i) {
                return 0;
            }
        });
    }

    @Test
    public void shouldReturnFalseOnCloseWhenThreadsHaventTerminated() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        try {
            Properties properties = new Properties();
            properties.setProperty("application.id", "appId");
            properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
            properties.setProperty("auto.offset.reset", "earliest");
            KStreamBuilder kStreamBuilder = new KStreamBuilder();
            final CountDownLatch countDownLatch = new CountDownLatch(NUM_BROKERS);
            CLUSTER.createTopic("input");
            kStreamBuilder.stream(Serdes.String(), Serdes.String(), new String[]{"input"}).foreach(new ForeachAction<String, String>() { // from class: org.apache.kafka.streams.KafkaStreamsTest.2
                public void apply(String str, String str2) {
                    try {
                        countDownLatch.countDown();
                        while (atomicBoolean.get()) {
                            Thread.sleep(10L);
                        }
                    } catch (InterruptedException e) {
                    }
                }
            });
            KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, properties);
            kafkaStreams.start();
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp("input", Collections.singletonList(new KeyValue("A", "A")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(System.currentTimeMillis()));
            Assert.assertTrue("Timed out waiting to receive single message", countDownLatch.await(30L, TimeUnit.SECONDS));
            Assert.assertFalse(kafkaStreams.close(10L, TimeUnit.MILLISECONDS));
            atomicBoolean.set(false);
        } catch (Throwable th) {
            atomicBoolean.set(false);
            throw th;
        }
    }

    private KafkaStreams createKafkaStreams() {
        Properties properties = new Properties();
        properties.setProperty("application.id", "appId");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        return new KafkaStreams(new KStreamBuilder(), properties);
    }

    @Test
    public void testCleanup() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testLocalCleanup");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.cleanUp();
        kafkaStreams.start();
        kafkaStreams.close();
        kafkaStreams.cleanUp();
    }

    @Test(expected = IllegalStateException.class)
    public void testCannotCleanupWhileRunning() throws Exception {
        Properties properties = new Properties();
        properties.setProperty("application.id", "testCannotCleanupWhileRunning");
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        KafkaStreams kafkaStreams = new KafkaStreams(new KStreamBuilder(), properties);
        kafkaStreams.start();
        try {
            try {
                kafkaStreams.cleanUp();
                kafkaStreams.close();
            } catch (IllegalStateException e) {
                Assert.assertEquals("Cannot clean up while running.", e.getMessage());
                throw e;
            }
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void testToString() {
        this.streams.start();
        String kafkaStreams = this.streams.toString();
        this.streams.close();
        String trim = kafkaStreams.split("\\n")[NUM_BROKERS].split(":")[NUM_BROKERS].trim();
        Assert.assertNotEquals("streamString should not be empty", "", kafkaStreams);
        Assert.assertNotNull("streamString should not be null", kafkaStreams);
        Assert.assertNotEquals("streamString contains non-empty appId", "", trim);
        Assert.assertNotNull("streamString contains non-null appId", trim);
    }
}
