/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class KStreamSessionWindowAggregateProcessorTest {
    private static final long GAP_MS = 300000L;
    private static final String STORE_NAME = "session-store";
    private final MockTime time = new MockTime();
    private final Metrics metrics = new Metrics();
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)this.time);
    private final String threadId = Thread.currentThread().getName();
    private final Initializer<Long> initializer = () -> 0L;
    private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1L;
    private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
    private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList<KeyValueTimestamp<Windowed<String>, Change<Long>>>();
    private InternalMockProcessorContext<Windowed<String>, Change<Long>> context;
    private KStreamSessionWindowAggregate<String, String, Long> sessionAggregator;
    private Processor<String, String, Windowed<String>, Change<Long>> processor;
    private SessionStore<String, Long> sessionStore;
    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;
    private EmitStrategy emitStrategy;
    private boolean emitFinal;

    @Parameterized.Parameters(name="{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList({EmitStrategy.StrategyType.ON_WINDOW_UPDATE}, {EmitStrategy.StrategyType.ON_WINDOW_CLOSE});
    }

    @Before
    public void setup() {
        this.setup(true);
    }

    private void setup(boolean enableCache) {
        Properties prop = StreamsTestUtils.getStreamsConfig();
        prop.put("__emit.interval.ms.kstreams.windowed.aggregation__", (Object)0);
        StreamsConfig config = new StreamsConfig((Map)prop);
        this.context = new InternalMockProcessorContext<Windowed<String>, Change<Long>>(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), this.streamsMetrics, config, MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000L, this.streamsMetrics), (Time)this.time){

            @Override
            public <K extends Windowed<String>, V extends Change<Long>> void forward(Record<K, V> record) {
                KStreamSessionWindowAggregateProcessorTest.this.results.add(new KeyValueTimestamp<Object, Object>(record.key(), record.value(), record.timestamp()));
            }
        };
        this.emitFinal = this.type.equals((Object)EmitStrategy.StrategyType.ON_WINDOW_CLOSE);
        this.emitStrategy = EmitStrategy.StrategyType.forType((EmitStrategy.StrategyType)this.type);
        this.sessionAggregator = new KStreamSessionWindowAggregate(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(300000L)), STORE_NAME, this.emitStrategy, this.initializer, this.aggregator, this.sessionMerger);
        if (this.processor != null) {
            this.processor.close();
        }
        this.processor = this.sessionAggregator.get();
        this.context.setTime(0L);
        TaskMetrics.droppedRecordsSensor((String)this.threadId, (String)this.context.taskId().toString(), (StreamsMetricsImpl)this.streamsMetrics);
        this.initStore(enableCache);
        this.processor.init(this.context);
    }

    private void initStore(boolean enableCaching) {
        RocksDbTimeOrderedSessionBytesStoreSupplier supplier = this.emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE ? new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, 900000L, true) : Stores.persistentSessionStore((String)STORE_NAME, (Duration)Duration.ofMillis(900000L));
        StoreBuilder storeBuilder = Stores.sessionStoreBuilder((SessionBytesStoreSupplier)supplier, (Serde)Serdes.String(), (Serde)Serdes.Long()).withLoggingDisabled();
        if (enableCaching && this.emitStrategy.type() != EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
            storeBuilder.withCachingEnabled();
        }
        if (this.sessionStore != null) {
            this.sessionStore.close();
        }
        this.sessionStore = (SessionStore)storeBuilder.build();
        this.sessionStore.init(this.context, this.sessionStore);
    }

    @After
    public void closeStore() {
        this.sessionStore.close();
        this.processor.close();
    }

    @Test
    public void shouldCreateSingleSessionWhenWithinGap() {
        this.processor.process(new Record((Object)"john", (Object)"first", 0L));
        this.processor.process(new Record((Object)"john", (Object)"second", 500L));
        try (KeyValueIterator values = this.sessionStore.findSessions((Object)"john", 0L, 2000L);){
            Assert.assertTrue((boolean)values.hasNext());
            Assert.assertEquals((Object)2L, (Object)((KeyValue)values.next()).value);
        }
    }

    @Test
    public void shouldMergeSessions() {
        String sessionId = "mel";
        this.processor.process(new Record((Object)"mel", (Object)"first", 0L));
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 0L, 0L).hasNext());
        this.processor.process(new Record((Object)"mel", (Object)"second", 300001L));
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 300001L, 300001L).hasNext());
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 0L, 0L).hasNext());
        this.processor.process(new Record((Object)"mel", (Object)"third", 150000L));
        try (KeyValueIterator iterator = this.sessionStore.findSessions((Object)"mel", 0L, 300001L);){
            KeyValue kv = (KeyValue)iterator.next();
            Assert.assertEquals((Object)3L, (Object)kv.value);
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void shouldUpdateSessionIfTheSameTime() {
        this.processor.process(new Record((Object)"mel", (Object)"first", 0L));
        this.processor.process(new Record((Object)"mel", (Object)"second", 0L));
        try (KeyValueIterator iterator = this.sessionStore.findSessions((Object)"mel", 0L, 0L);){
            Assert.assertEquals((Object)2L, (Object)((KeyValue)iterator.next()).value);
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
        String sessionId = "mel";
        long now = 0L;
        this.processor.process(new Record((Object)"mel", (Object)"first", now));
        this.processor.process(new Record((Object)"mel", (Object)"second", now += 300001L));
        this.processor.process(new Record((Object)"mel", (Object)"second", now));
        this.processor.process(new Record((Object)"mel", (Object)"third", now += 300001L));
        this.processor.process(new Record((Object)"mel", (Object)"third", now));
        this.processor.process(new Record((Object)"mel", (Object)"third", now));
        this.sessionStore.flush();
        if (this.emitFinal) {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(300001L, 300001L)), new Change((Object)2L, null), 300001L)), this.results);
        } else {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(300001L, 300001L)), new Change((Object)2L, null), 300001L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(now, now)), new Change((Object)3L, null), now)), this.results);
        }
    }

    @Test
    public void shouldRemoveMergedSessionsFromStateStore() {
        this.processor.process(new Record((Object)"a", (Object)"1", 0L));
        try (KeyValueIterator a1 = this.sessionStore.findSessions((Object)"a", 0L, 0L);){
            Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)a1.next());
        }
        this.processor.process(new Record((Object)"a", (Object)"2", 100L));
        var2_2 = null;
        try (KeyValueIterator a2 = this.sessionStore.findSessions((Object)"a", 0L, 100L);){
            Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 100L)), (Object)2L), (Object)a2.next());
            Assert.assertFalse((boolean)a2.hasNext());
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
    }

    @Test
    public void shouldHandleMultipleSessionsAndMerging() {
        this.processor.process(new Record((Object)"a", (Object)"1", 0L));
        this.processor.process(new Record((Object)"b", (Object)"1", 0L));
        this.processor.process(new Record((Object)"c", (Object)"1", 0L));
        this.processor.process(new Record((Object)"d", (Object)"1", 0L));
        this.processor.process(new Record((Object)"d", (Object)"2", 150000L));
        this.processor.process(new Record((Object)"a", (Object)"2", 300001L));
        this.processor.process(new Record((Object)"b", (Object)"2", 300001L));
        this.processor.process(new Record((Object)"a", (Object)"3", 450001L));
        this.processor.process(new Record((Object)"c", (Object)"3", 450001L));
        this.sessionStore.flush();
        if (this.emitFinal) {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"d", (Window)new SessionWindow(0L, 150000L)), new Change((Object)2L, null), 150000L)), this.results);
        } else {
            Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"d", (Window)new SessionWindow(0L, 150000L)), new Change((Object)2L, null), 150000L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(300001L, 300001L)), new Change((Object)1L, null), 300001L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(300001L, 450001L)), new Change((Object)2L, null), 450001L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(450001L, 450001L)), new Change((Object)1L, null), 450001L)), this.results);
        }
    }

    @Test
    public void shouldGetAggregatedValuesFromValueGetter() {
        KTableValueGetter getter = this.sessionAggregator.view().get();
        getter.init(this.context);
        this.processor.process(new Record((Object)"a", (Object)"1", 0L));
        this.processor.process(new Record((Object)"a", (Object)"1", 300001L));
        this.processor.process(new Record((Object)"a", (Object)"2", 300001L));
        long t0 = (Long)getter.get((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L))).value();
        long t1 = (Long)getter.get((Object)new Windowed((Object)"a", (Window)new SessionWindow(300001L, 300001L))).value();
        Assert.assertEquals((long)1L, (long)t0);
        Assert.assertEquals((long)2L, (long)t1);
    }

    @Test
    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
        if (this.emitFinal) {
            return;
        }
        this.initStore(false);
        this.processor.init(this.context);
        this.processor.process(new Record((Object)"a", (Object)"1", 0L));
        this.processor.process(new Record((Object)"b", (Object)"1", 0L));
        this.processor.process(new Record((Object)"c", (Object)"1", 0L));
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L)), this.results);
    }

    @Test
    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
        if (this.emitFinal) {
            return;
        }
        this.initStore(false);
        this.processor.init(this.context);
        this.processor.process(new Record((Object)"a", (Object)"1", 0L));
        this.processor.process(new Record((Object)"a", (Object)"1", 5L));
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change(null, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 5L)), new Change((Object)2L, null), 5L)), this.results);
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics() {
        this.setup(false);
        this.context.setRecordContext(new ProcessorRecordContext(-1L, -2L, -3, "topic", (Headers)new RecordHeaders()));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);){
            this.processor.process(new Record(null, (Object)"1", 0L));
            MatcherAssert.assertThat(appender.getEvents().stream().filter(e -> e.getLevel().equals("WARN")).map(LogCaptureAppender.Event::getMessage).collect(Collectors.toList()), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. topic=[topic] partition=[-3] offset=[-2]"));
        }
        Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(this.context.metrics().metrics(), "dropped-records-total", "stream-task-metrics").metricValue());
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace() {
        this.setup(false);
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.ofInactivityGapAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(0L)), STORE_NAME, EmitStrategy.onWindowUpdate(), this.initializer, this.aggregator, this.sessionMerger).get();
        processor.init(this.context);
        this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
        processor.process(new Record((Object)"dummy", (Object)"dummy", 0L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
        processor.process(new Record((Object)"OnTime1", (Object)"1", 0L));
        this.context.setRecordContext(new ProcessorRecordContext(11L, -2L, -3, "topic", (Headers)new RecordHeaders()));
        processor.process(new Record((Object)"dummy", (Object)"dummy", 11L));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);){
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"Late1", (Object)"1", 0L));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record for expired window. topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[11]"));
        }
        MetricName dropTotal = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName dropRate = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MatcherAssert.assertThat((Object)((KafkaMetric)this.metrics.metrics().get(dropTotal)).metricValue(), (Matcher)CoreMatchers.is((Object)1.0));
        MatcherAssert.assertThat((Object)((Double)((KafkaMetric)this.metrics.metrics().get(dropRate)).metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace() {
        this.setup(false);
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.ofInactivityGapAndGrace((Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(1L)), STORE_NAME, EmitStrategy.onWindowUpdate(), this.initializer, this.aggregator, this.sessionMerger).get();
        processor.init(this.context);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);){
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"dummy", (Object)"dummy", 0L));
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"OnTime1", (Object)"1", 0L));
            this.context.setRecordContext(new ProcessorRecordContext(11L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"dummy", (Object)"dummy", 11L));
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"OnTime2", (Object)"1", 0L));
            this.context.setRecordContext(new ProcessorRecordContext(12L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"dummy", (Object)"dummy", 12L));
            this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers)new RecordHeaders()));
            processor.process(new Record((Object)"Late1", (Object)"1", 0L));
            MatcherAssert.assertThat((Object)appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record for expired window. topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[12]"));
        }
        MetricName dropTotal = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MetricName dropRate = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        MatcherAssert.assertThat((Object)((KafkaMetric)this.metrics.metrics().get(dropTotal)).metricValue(), (Matcher)CoreMatchers.is((Object)1.0));
        MatcherAssert.assertThat((Object)((Double)((KafkaMetric)this.metrics.metrics().get(dropRate)).metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }
}

