package org.apache.kafka.streams.kstream.internals;

import java.util.Arrays;
import java.util.HashMap;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.ForeachAction;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.Serialized;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockReducer;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImplTest.class */
public class SessionWindowedKStreamImplTest {
    private static final String TOPIC = "input";
    private final StreamsBuilder builder = new StreamsBuilder();

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();
    private final Merger<String, String> sessionMerger = new Merger<String, String>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImplTest.1
        public String apply(String str, String str2, String str3) {
            return str2 + "+" + str3;
        }
    };
    private SessionWindowedKStream<String, String> stream;

    @Before
    public void before() {
        this.stream = this.builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Serialized.with(Serdes.String(), Serdes.String())).windowedBy(SessionWindows.with(500L));
    }

    @Test
    public void shouldCountSessionWindowed() {
        final HashMap hashMap = new HashMap();
        this.stream.count().toStream().foreach(new ForeachAction<Windowed<String>, Long>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImplTest.2
            public void apply(Windowed<String> windowed, Long l) {
                hashMap.put(windowed, l);
            }
        });
        processData();
        MatcherAssert.assertThat(hashMap.get(new Windowed("1", new SessionWindow(10L, 15L))), CoreMatchers.equalTo(2L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("2", new SessionWindow(600L, 600L))), CoreMatchers.equalTo(1L));
        MatcherAssert.assertThat(hashMap.get(new Windowed("1", new SessionWindow(600L, 600L))), CoreMatchers.equalTo(1L));
    }

    @Test
    public void shouldReduceWindowed() {
        final HashMap hashMap = new HashMap();
        this.stream.reduce(MockReducer.STRING_ADDER).toStream().foreach(new ForeachAction<Windowed<String>, String>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImplTest.3
            public void apply(Windowed<String> windowed, String str) {
                hashMap.put(windowed, str);
            }
        });
        processData();
        MatcherAssert.assertThat(hashMap.get(new Windowed("1", new SessionWindow(10L, 15L))), CoreMatchers.equalTo("1+2"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("2", new SessionWindow(600L, 600L))), CoreMatchers.equalTo("1"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("1", new SessionWindow(600L, 600L))), CoreMatchers.equalTo("3"));
    }

    @Test
    public void shouldAggregateSessionWindowed() {
        final HashMap hashMap = new HashMap();
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger).toStream().foreach(new ForeachAction<Windowed<String>, String>() { // from class: org.apache.kafka.streams.kstream.internals.SessionWindowedKStreamImplTest.4
            public void apply(Windowed<String> windowed, String str) {
                hashMap.put(windowed, str);
            }
        });
        processData();
        MatcherAssert.assertThat(hashMap.get(new Windowed("1", new SessionWindow(10L, 15L))), CoreMatchers.equalTo("0+0+1+2"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("2", new SessionWindow(600L, 600L))), CoreMatchers.equalTo("0+1"));
        MatcherAssert.assertThat(hashMap.get(new Windowed("1", new SessionWindow(600L, 600L))), CoreMatchers.equalTo("0+3"));
    }

    @Test
    public void shouldMaterializeCount() {
        this.stream.count(Materialized.as("count-store").withKeySerde(Serdes.String()));
        processData();
        MatcherAssert.assertThat(StreamsTestUtils.toList(this.driver.allStateStores().get("count-store").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), 2L), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), 1L), KeyValue.pair(new Windowed("2", new SessionWindow(600L, 600L)), 1L))));
    }

    @Test
    public void shouldMaterializeWithoutSpecifyingSerdes() {
        this.stream.count(Materialized.as("count-store"));
        processData();
        MatcherAssert.assertThat(StreamsTestUtils.toList(this.driver.allStateStores().get("count-store").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), 2L), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), 1L), KeyValue.pair(new Windowed("2", new SessionWindow(600L, 600L)), 1L))));
    }

    @Test
    public void shouldMaterializeReduced() {
        this.stream.reduce(MockReducer.STRING_ADDER, Materialized.as("reduced").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        processData();
        MatcherAssert.assertThat(StreamsTestUtils.toList(this.driver.allStateStores().get("reduced").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), "1+2"), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), "3"), KeyValue.pair(new Windowed("2", new SessionWindow(600L, 600L)), "1"))));
    }

    @Test
    public void shouldMaterializeAggregated() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as("aggregated").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()));
        processData();
        MatcherAssert.assertThat(StreamsTestUtils.toList(this.driver.allStateStores().get("aggregated").fetch("1", "2")), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("1", new SessionWindow(10L, 15L)), "0+0+1+2"), KeyValue.pair(new Windowed("1", new SessionWindow(600L, 600L)), "0+3"), KeyValue.pair(new Windowed("2", new SessionWindow(600L, 600L)), "0+1"))));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfInitializerIsNull() {
        this.stream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, this.sessionMerger);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfAggregatorIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, this.sessionMerger);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnAggregateIfMergerIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnReduceIfReducerIsNull() {
        this.stream.reduce((Reducer) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfInitializerIsNull() {
        this.stream.aggregate((Initializer) null, MockAggregator.TOSTRING_ADDER, this.sessionMerger, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfAggregatorIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, (Aggregator) null, this.sessionMerger, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfMergerIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Merger) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedAggregateIfMaterializedIsNull() {
        this.stream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, this.sessionMerger, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedReduceIfReducerIsNull() {
        this.stream.reduce((Reducer) null, Materialized.as("store"));
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnMaterializedReduceIfMaterializedIsNull() {
        this.stream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
    }

    @Test(expected = NullPointerException.class)
    public void shouldThrowNullPointerOnCountIfMaterializedIsNull() {
        this.stream.count((Materialized) null);
    }

    private void processData() {
        this.driver.setUp(this.builder, TestUtils.tempDirectory(), 0L);
        this.driver.setTime(10L);
        this.driver.process(TOPIC, "1", "1");
        this.driver.setTime(15L);
        this.driver.process(TOPIC, "1", "2");
        this.driver.setTime(600L);
        this.driver.process(TOPIC, "1", "3");
        this.driver.process(TOPIC, "2", "1");
        this.driver.flushState();
    }
}
