package org.apache.kafka.streams.kstream.internals;

import java.util.Random;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.test.KStreamTestDriver;
import org.apache.kafka.test.MockProcessorSupplier;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest.class */
public class AbstractStreamTest {
    private final String topicName = "topic";

    @Rule
    public final KStreamTestDriver driver = new KStreamTestDriver();

    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest$ExtendedKStream.class */
    private class ExtendedKStream<K, V> extends AbstractStream<K> {
        ExtendedKStream(KStream<K, V> kStream) {
            super((KStreamImpl) kStream);
        }

        KStream<K, V> randomFilter() {
            String newProcessorName = this.builder.newProcessorName("RANDOM-FILTER-");
            this.builder.internalTopologyBuilder.addProcessor(newProcessorName, new ExtendedKStreamDummy(), new String[]{this.name});
            return new KStreamImpl(this.builder, newProcessorName, this.sourceNodes, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest$ExtendedKStreamDummy.class */
    public class ExtendedKStreamDummy<K, V> implements ProcessorSupplier<K, V> {
        private Random rand = new Random();

        /* loaded from: input_file:org/apache/kafka/streams/kstream/internals/AbstractStreamTest$ExtendedKStreamDummy$ExtendedKStreamDummyProcessor.class */
        private class ExtendedKStreamDummyProcessor extends AbstractProcessor<K, V> {
            private ExtendedKStreamDummyProcessor() {
            }

            public void process(K k, V v) {
                if (ExtendedKStreamDummy.this.rand.nextBoolean()) {
                    context().forward(k, v);
                }
            }
        }

        ExtendedKStreamDummy() {
        }

        public Processor<K, V> get() {
            return new ExtendedKStreamDummyProcessor();
        }
    }

    @Test
    public void testShouldBeExtensible() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        int[] iArr = {1, 2, 3, 4, 5, 6, 7};
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        new ExtendedKStream(streamsBuilder.stream("topic", Consumed.with(Serdes.Integer(), Serdes.String()))).randomFilter().process(mockProcessorSupplier, new String[0]);
        this.driver.setUp(streamsBuilder);
        for (int i : iArr) {
            this.driver.process("topic", Integer.valueOf(i), "V" + i);
        }
        Assert.assertTrue(mockProcessorSupplier.processed.size() <= iArr.length);
    }
}
