/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ResponseJoinProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.Assert;
import org.junit.Test;

public class ResponseJoinProcessorSupplierTest {
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final ValueJoiner<String, String, String> JOINER = (value1, value2) -> "(" + value1 + "," + value2 + ")";

    @Test
    public void shouldNotForwardWhenHashDoesNotMatch() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = false;
        ResponseJoinProcessorSupplier processorSupplier = new ResponseJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
        processor.init(context);
        context.setRecordMetadata("topic", 0, 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] oldHash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "oldLhsValue"));
        processor.process(new Record((Object)"lhs1", (Object)new SubscriptionResponseWrapper(oldHash, (Object)"rhsValue", Integer.valueOf(0)), 0L));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded, (Matcher)IsEmptyCollection.empty());
        Assert.assertEquals((Object)1.0, (Object)ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(context));
        Assert.assertNotEquals((Object)0.0, (Object)ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(context));
    }

    @Test
    public void shouldIgnoreUpdateWhenLeftHasBecomeNull() {
        TestKTableValueGetterSupplier<String, Object> valueGetterSupplier = new TestKTableValueGetterSupplier<String, Object>();
        boolean leftJoin = false;
        ResponseJoinProcessorSupplier processorSupplier = new ResponseJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
        processor.init(context);
        context.setRecordMetadata("topic", 0, 0L);
        valueGetterSupplier.put("lhs1", null);
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process(new Record((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, (Object)"rhsValue", Integer.valueOf(0)), 0L));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded, (Matcher)IsEmptyCollection.empty());
        Assert.assertEquals((Object)1.0, (Object)ResponseJoinProcessorSupplierTest.getDroppedRecordsTotalMetric(context));
        Assert.assertNotEquals((Object)0.0, (Object)ResponseJoinProcessorSupplierTest.getDroppedRecordsRateMetric(context));
    }

    @Test
    public void shouldForwardWhenHashMatches() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = false;
        ResponseJoinProcessorSupplier processorSupplier = new ResponseJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
        processor.init(context);
        context.setRecordMetadata("topic", 0, 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process(new Record((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, (Object)"rhsValue", Integer.valueOf(0)), 0L));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"lhs1", (Object)"(lhsValue,rhsValue)", 0L)));
    }

    @Test
    public void shouldEmitTombstoneForInnerJoinWhenRightIsNull() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = false;
        ResponseJoinProcessorSupplier processorSupplier = new ResponseJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, false);
        Processor processor = processorSupplier.get();
        MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
        processor.init(context);
        context.setRecordMetadata("topic", 0, 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process(new Record((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, null, Integer.valueOf(0)), 0L));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"lhs1", null, 0L)));
    }

    @Test
    public void shouldEmitResultForLeftJoinWhenRightIsNull() {
        TestKTableValueGetterSupplier<String, String> valueGetterSupplier = new TestKTableValueGetterSupplier<String, String>();
        boolean leftJoin = true;
        ResponseJoinProcessorSupplier processorSupplier = new ResponseJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, true);
        Processor processor = processorSupplier.get();
        MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
        processor.init(context);
        context.setRecordMetadata("topic", 0, 0L);
        valueGetterSupplier.put("lhs1", "lhsValue");
        long[] hash = Murmur3.hash128((byte[])STRING_SERIALIZER.serialize("topic-join-resolver", "lhsValue"));
        processor.process(new Record((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, null, Integer.valueOf(0)), 0L));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"lhs1", (Object)"(lhsValue,null)", 0L)));
    }

    @Test
    public void shouldEmitTombstoneForLeftJoinWhenRightIsNullAndLeftIsNull() {
        TestKTableValueGetterSupplier<String, Object> valueGetterSupplier = new TestKTableValueGetterSupplier<String, Object>();
        boolean leftJoin = true;
        ResponseJoinProcessorSupplier processorSupplier = new ResponseJoinProcessorSupplier(valueGetterSupplier, (Serializer)STRING_SERIALIZER, () -> "value-hash-dummy-topic", JOINER, true);
        Processor processor = processorSupplier.get();
        MockInternalNewProcessorContext context = new MockInternalNewProcessorContext();
        processor.init(context);
        context.setRecordMetadata("topic", 0, 0L);
        valueGetterSupplier.put("lhs1", null);
        long[] hash = null;
        processor.process(new Record((Object)"lhs1", (Object)new SubscriptionResponseWrapper(hash, null, Integer.valueOf(0)), 0L));
        List forwarded = context.forwarded();
        MatcherAssert.assertThat((Object)forwarded.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)((MockProcessorContext.CapturedForward)forwarded.get(0)).record(), (Matcher)CoreMatchers.is((Object)new Record((Object)"lhs1", null, 0L)));
    }

    static Object getDroppedRecordsTotalMetric(InternalProcessorContext<String, ?> context) {
        MetricName dropTotalMetric = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        return ((Metric)context.metrics().metrics().get(dropTotalMetric)).metricValue();
    }

    static Object getDroppedRecordsRateMetric(InternalProcessorContext<String, ?> context) {
        MetricName dropRateMetric = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName()), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        return ((Metric)context.metrics().metrics().get(dropRateMetric)).metricValue();
    }

    private static class TestKTableValueGetterSupplier<K, V>
    implements KTableValueGetterSupplier<K, V> {
        private final Map<K, V> map = new HashMap();

        private TestKTableValueGetterSupplier() {
        }

        public KTableValueGetter<K, V> get() {
            return new KTableValueGetter<K, V>(){

                public void init(ProcessorContext<?, ?> context) {
                }

                public ValueAndTimestamp<V> get(K key) {
                    return ValueAndTimestamp.make(map.get(key), (long)-1L);
                }

                public boolean isVersioned() {
                    return false;
                }
            };
        }

        public String[] storeNames() {
            return new String[0];
        }

        void put(K key, V value) {
            this.map.put(key, value);
        }
    }
}

