package org.apache.storm.streams;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.storm.shade.com.google.common.collect.Multimap;
import org.apache.storm.shade.org.jgrapht.DirectedGraph;
import org.apache.storm.shade.org.jgrapht.graph.DefaultDirectedGraph;
import org.apache.storm.state.KeyValueState;
import org.apache.storm.streams.operations.StateUpdater;
import org.apache.storm.streams.processors.Processor;
import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/storm/streams/StatefulProcessorBoltTest.class */
public class StatefulProcessorBoltTest {
    TopologyContext mockTopologyContext;
    OutputCollector mockOutputCollector;
    StatefulProcessorBolt<String, Long> bolt;
    Tuple mockTuple1;
    DirectedGraph<Node, Edge> graph;
    Multimap<String, ProcessorNode> mockStreamToProcessors;
    KeyValueState<String, Long> mockKeyValueState;

    @BeforeEach
    public void setUp() throws Exception {
        this.mockTopologyContext = (TopologyContext) Mockito.mock(TopologyContext.class);
        this.mockOutputCollector = (OutputCollector) Mockito.mock(OutputCollector.class);
        this.mockTuple1 = (Tuple) Mockito.mock(Tuple.class);
        this.mockStreamToProcessors = (Multimap) Mockito.mock(Multimap.class);
        this.mockKeyValueState = (KeyValueState) Mockito.mock(KeyValueState.class);
        setUpMockTuples(this.mockTuple1);
    }

    @Test
    public void testEmitAndAck() {
        setUpStatefulProcessorBolt(new UpdateStateByKeyProcessor(new StateUpdater<Object, Long>() { // from class: org.apache.storm.streams.StatefulProcessorBoltTest.1
            /* renamed from: init, reason: merged with bridge method [inline-methods] */
            public Long m14init() {
                return 0L;
            }

            public Long apply(Long l, Object obj) {
                return Long.valueOf(l.longValue() + 1);
            }
        }));
        this.bolt.execute(this.mockTuple1);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Collection.class);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Values.class);
        ArgumentCaptor forClass3 = ArgumentCaptor.forClass(String.class);
        ((OutputCollector) Mockito.verify(this.mockOutputCollector)).emit((String) forClass3.capture(), (Collection) forClass.capture(), (List) forClass2.capture());
        Assertions.assertEquals("outputstream", forClass3.getValue());
        Assertions.assertArrayEquals(new Object[]{this.mockTuple1}, ((Collection) forClass.getValue()).toArray());
        Assertions.assertEquals(new Values(new Object[]{"k", 1L}), forClass2.getValue());
        ((OutputCollector) Mockito.verify(this.mockOutputCollector, Mockito.times(1))).ack(this.mockTuple1);
        ((KeyValueState) Mockito.verify(this.mockKeyValueState, Mockito.times(1))).put("k", 1L);
    }

    private void setUpStatefulProcessorBolt(Processor<?> processor) {
        ProcessorNode processorNode = new ProcessorNode(processor, "outputstream", new Fields(new String[]{"value"}));
        processorNode.setEmitsPair(true);
        Mockito.when(this.mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(processorNode));
        this.graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
        this.graph.addVertex(processorNode);
        this.bolt = new StatefulProcessorBolt<>("bolt1", this.graph, Collections.singletonList(processorNode));
        this.bolt.setStreamToInitialProcessors(this.mockStreamToProcessors);
        this.bolt.prepare(new HashMap(), this.mockTopologyContext, this.mockOutputCollector);
        this.bolt.initState(this.mockKeyValueState);
    }

    private void setUpMockTuples(Tuple... tupleArr) {
        for (Tuple tuple : tupleArr) {
            Mockito.when(Integer.valueOf(tuple.size())).thenReturn(1);
            Mockito.when(tuple.getValue(0)).thenReturn(Pair.of("k", "v"));
            Mockito.when(tuple.getSourceComponent()).thenReturn("bolt0");
            Mockito.when(tuple.getSourceStreamId()).thenReturn("inputstream");
        }
    }
}
