/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricValueProvider;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.ActiveTaskCreator;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockClientSupplier;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsNot;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class ActiveTaskCreatorTest {
    @Mock
    private InternalTopologyBuilder builder;
    @Mock
    private StateDirectory stateDirectory;
    @Mock
    private ChangelogReader changeLogReader;
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "clientId", "latest", (Time)new MockTime());
    private final Map<String, Object> properties = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"appId"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"dummy:1234")});
    final UUID uuid = UUID.randomUUID();
    private ActiveTaskCreator activeTaskCreator;

    @Test
    public void shouldConstructProducerMetricsWithEosDisabled() {
        this.shouldConstructThreadProducerMetric();
    }

    @Test
    public void shouldConstructClientIdWithEosDisabled() {
        this.createTasks();
        Set clientIds = this.activeTaskCreator.producerClientIds();
        MatcherAssert.assertThat((Object)clientIds, (Matcher)CoreMatchers.is(Collections.singleton("clientId-StreamThread-0-producer")));
    }

    @Test
    public void shouldCloseThreadProducerIfEosDisabled() {
        this.createTasks();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(0).closed(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldNoOpCloseTaskProducerIfEosDisabled() {
        this.createTasks();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(0).closed(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldReturnBlockedTimeWhenThreadProducer() {
        double blockedTime = 123.0;
        this.createTasks();
        MockProducer<byte[], byte[]> producer = this.mockClientSupplier.producers.get(0);
        this.addMetric(producer, "flush-time-ns-total", 123.0);
        MatcherAssert.assertThat((Object)this.activeTaskCreator.totalProducerBlockedTime(), (Matcher)Matchers.closeTo((double)123.0, (double)0.01));
    }

    @Test
    public void shouldFailOnStreamsProducerPerTaskIfEosDisabled() {
        this.createTasks();
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.activeTaskCreator.streamsProducerForTask(null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Expected EXACTLY_ONCE to be enabled, but the processing mode was AT_LEAST_ONCE"));
    }

    @Test
    public void shouldFailOnGetThreadProducerIfEosDisabled() {
        this.createTasks();
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> ((ActiveTaskCreator)this.activeTaskCreator).threadProducer());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was AT_LEAST_ONCE"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosDisabled() {
        this.createTasks();
        this.mockClientSupplier.producers.get((int)0).closeException = new RuntimeException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((ActiveTaskCreator)this.activeTaskCreator).closeThreadProducerIfNeeded());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Thread producer encounter error trying to close."));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)CoreMatchers.is((Object)"KABOOM!"));
    }

    @Test
    public void shouldReturnStreamsProducerPerTaskIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.shouldReturnStreamsProducerPerTask();
    }

    @Test
    public void shouldConstructProducerMetricsWithEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.shouldConstructProducerMetricsPerTask();
    }

    @Test
    public void shouldConstructClientIdWithEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        Set clientIds = this.activeTaskCreator.producerClientIds();
        MatcherAssert.assertThat((Object)clientIds, (Matcher)CoreMatchers.is((Object)Utils.mkSet((Object[])new String[]{"clientId-StreamThread-0-0_0-producer", "clientId-StreamThread-0-0_1-producer"})));
    }

    @Test
    public void shouldNoOpCloseThreadProducerIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(0).closed(), (Matcher)CoreMatchers.is((Object)false));
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(1).closed(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldCloseTaskProducersIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 2));
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(0).closed(), (Matcher)CoreMatchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(1).closed(), (Matcher)CoreMatchers.is((Object)true));
        this.mockClientSupplier.producers.get((int)0).closeException = new RuntimeException("KABOOM!");
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
    }

    @Test
    public void shouldReturnBlockedTimeWhenTaskProducers() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        double total = 0.0;
        double blocked = 1.0;
        for (MockProducer<byte[], byte[]> producer : this.mockClientSupplier.producers) {
            this.addMetric(producer, "flush-time-ns-total", blocked);
            total += blocked;
            blocked += 1.0;
        }
        MatcherAssert.assertThat((Object)this.activeTaskCreator.totalProducerBlockedTime(), (Matcher)Matchers.closeTo((double)total, (double)0.01));
    }

    @Test
    public void shouldFailForUnknownTaskOnStreamsProducerPerTaskIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.activeTaskCreator.streamsProducerForTask(null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Unknown TaskId: null"));
        thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.activeTaskCreator.streamsProducerForTask(new TaskId(0, 2)));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Unknown TaskId: 0_2"));
    }

    @Test
    public void shouldFailOnGetThreadProducerIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> ((ActiveTaskCreator)this.activeTaskCreator).threadProducer());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Expected EXACTLY_ONCE_V2 to be enabled, but the processing mode was EXACTLY_ONCE_ALPHA"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnErrorCloseTaskProducerIfEosAlphaEnabled() {
        this.properties.put("processing.guarantee", "exactly_once");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        this.mockClientSupplier.producers.get((int)0).closeException = new RuntimeException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0)));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"[0_0] task producer encounter error trying to close."));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)CoreMatchers.is((Object)"KABOOM!"));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
    }

    @Test
    public void shouldReturnThreadProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        StreamsProducer threadProducer = this.activeTaskCreator.threadProducer();
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat((Object)threadProducer.kafkaProducer(), (Matcher)CoreMatchers.is(this.mockClientSupplier.producers.get(0)));
    }

    @Test
    public void shouldConstructProducerMetricsWithEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.shouldConstructThreadProducerMetric();
    }

    @Test
    public void shouldConstructClientIdWithEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        Set clientIds = this.activeTaskCreator.producerClientIds();
        MatcherAssert.assertThat((Object)clientIds, (Matcher)CoreMatchers.is(Collections.singleton("clientId-StreamThread-0-producer")));
    }

    @Test
    public void shouldCloseThreadProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        this.activeTaskCreator.closeThreadProducerIfNeeded();
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(0).closed(), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldNoOpCloseTaskProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 0));
        this.activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(new TaskId(0, 1));
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.get(0).closed(), (Matcher)CoreMatchers.is((Object)false));
    }

    @Test
    public void shouldFailOnStreamsProducerPerTaskIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> this.activeTaskCreator.streamsProducerForTask(null));
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Expected EXACTLY_ONCE to be enabled, but the processing mode was EXACTLY_ONCE_V2"));
    }

    @Test
    public void shouldThrowStreamsExceptionOnErrorCloseThreadProducerIfEosV2Enabled() {
        this.properties.put("processing.guarantee", "exactly_once_v2");
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        this.mockClientSupplier.producers.get((int)0).closeException = new RuntimeException("KABOOM!");
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> ((ActiveTaskCreator)this.activeTaskCreator).closeThreadProducerIfNeeded());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)CoreMatchers.is((Object)"Thread producer encounter error trying to close."));
        MatcherAssert.assertThat((Object)thrown.getCause().getMessage(), (Matcher)CoreMatchers.is((Object)"KABOOM!"));
    }

    private void shouldReturnStreamsProducerPerTask() {
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        StreamsProducer streamsProducer1 = this.activeTaskCreator.streamsProducerForTask(new TaskId(0, 0));
        StreamsProducer streamsProducer2 = this.activeTaskCreator.streamsProducerForTask(new TaskId(0, 1));
        MatcherAssert.assertThat((Object)streamsProducer1, (Matcher)IsNot.not((Matcher)CoreMatchers.is((Object)streamsProducer2)));
    }

    private void shouldConstructProducerMetricsPerTask() {
        this.mockClientSupplier.setApplicationIdForProducer("appId");
        this.createTasks();
        MetricName testMetricName1 = new MetricName("test_metric_1", "", "", new HashMap());
        KafkaMetric testMetric1 = new KafkaMetric(new Object(), testMetricName1, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        this.mockClientSupplier.producers.get(0).setMockMetrics(testMetricName1, (Metric)testMetric1);
        MetricName testMetricName2 = new MetricName("test_metric_2", "", "", new HashMap());
        KafkaMetric testMetric2 = new KafkaMetric(new Object(), testMetricName2, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        this.mockClientSupplier.producers.get(0).setMockMetrics(testMetricName2, (Metric)testMetric2);
        Map producerMetrics = this.activeTaskCreator.producerMetrics();
        MatcherAssert.assertThat((Object)producerMetrics, (Matcher)CoreMatchers.is((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)testMetricName1, (Object)testMetric1), Utils.mkEntry((Object)testMetricName2, (Object)testMetric2)})));
    }

    private void shouldConstructThreadProducerMetric() {
        this.createTasks();
        MetricName testMetricName = new MetricName("test_metric", "", "", new HashMap());
        KafkaMetric testMetric = new KafkaMetric(new Object(), testMetricName, (MetricValueProvider)((Measurable)(config, now) -> 0.0), null, (Time)new MockTime());
        this.mockClientSupplier.producers.get(0).setMockMetrics(testMetricName, (Metric)testMetric);
        MatcherAssert.assertThat((Object)this.mockClientSupplier.producers.size(), (Matcher)CoreMatchers.is((Object)1));
        Map producerMetrics = this.activeTaskCreator.producerMetrics();
        MatcherAssert.assertThat((Object)producerMetrics.size(), (Matcher)CoreMatchers.is((Object)1));
        MatcherAssert.assertThat(producerMetrics.get(testMetricName), (Matcher)CoreMatchers.is((Object)testMetric));
    }

    private void createTasks() {
        TaskId task00 = new TaskId(0, 0);
        TaskId task01 = new TaskId(0, 1);
        ProcessorTopology topology = (ProcessorTopology)Mockito.mock(ProcessorTopology.class);
        SourceNode sourceNode = (SourceNode)Mockito.mock(SourceNode.class);
        Mockito.when((Object)this.builder.topologyConfigs()).thenReturn((Object)new TopologyConfig(new StreamsConfig(this.properties)));
        Mockito.when((Object)this.builder.buildSubtopology(0)).thenReturn((Object)topology);
        Mockito.when((Object)topology.sinkTopics()).thenReturn(Collections.emptySet());
        Mockito.when((Object)this.stateDirectory.getOrCreateDirectoryForTask(task00)).thenReturn(Mockito.mock(File.class));
        Mockito.when((Object)this.stateDirectory.checkpointFileFor(task00)).thenReturn(Mockito.mock(File.class));
        Mockito.when((Object)this.stateDirectory.getOrCreateDirectoryForTask(task01)).thenReturn(Mockito.mock(File.class));
        Mockito.when((Object)this.stateDirectory.checkpointFileFor(task01)).thenReturn(Mockito.mock(File.class));
        Mockito.when((Object)topology.source("topic")).thenReturn((Object)sourceNode);
        Mockito.when((Object)sourceNode.getTimestampExtractor()).thenReturn(Mockito.mock(TimestampExtractor.class));
        Mockito.when((Object)topology.sources()).thenReturn(Collections.singleton(sourceNode));
        StreamsConfig config = new StreamsConfig(this.properties);
        this.activeTaskCreator = new ActiveTaskCreator(new TopologyMetadata(this.builder, config), config, this.streamsMetrics, this.stateDirectory, this.changeLogReader, new ThreadCache(new LogContext(), 0L, this.streamsMetrics), (Time)new MockTime(), (KafkaClientSupplier)this.mockClientSupplier, "clientId-StreamThread-0", this.uuid, new LogContext().logger(ActiveTaskCreator.class), false);
        MatcherAssert.assertThat(this.activeTaskCreator.createTasks(this.mockClientSupplier.consumer, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)task00, Collections.singleton(new TopicPartition("topic", 0))), Utils.mkEntry((Object)task01, Collections.singleton(new TopicPartition("topic", 1)))})).stream().map(Task::id).collect(Collectors.toSet()), (Matcher)CoreMatchers.equalTo((Object)Utils.mkSet((Object[])new TaskId[]{task00, task01})));
    }

    private void addMetric(MockProducer<?, ?> producer, String name, final double value) {
        final MetricName metricName = this.metricName(name);
        producer.setMockMetrics(metricName, new Metric(){

            public MetricName metricName() {
                return metricName;
            }

            public Object metricValue() {
                return value;
            }
        });
    }

    private MetricName metricName(String name) {
        return new MetricName(name, "", "", Collections.emptyMap());
    }
}

