/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsContext;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorMetadata;
import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TopicPartitionMetadata;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.verification.VerificationMode;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class StreamTaskTest {
    private static final String APPLICATION_ID = "stream-task-test";
    private static final File BASE_DIR = TestUtils.tempDirectory();
    private final LogContext logContext = new LogContext("[test] ");
    private final String topic1 = "topic1";
    private final String topic2 = "topic2";
    private final TopicPartition partition1 = new TopicPartition("topic1", 1);
    private final TopicPartition partition2 = new TopicPartition("topic2", 1);
    private final Set<TopicPartition> partitions = Utils.mkSet((Object[])new TopicPartition[]{this.partition1, this.partition2});
    private final Serializer<Integer> intSerializer = Serdes.Integer().serializer();
    private final Deserializer<Integer> intDeserializer = Serdes.Integer().deserializer();
    private final MockSourceNode<Integer, Integer> source1 = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source2 = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer);
    private final MockSourceNode<Integer, Integer> source3 = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer){

        @Override
        public void process(Record<Integer, Integer> record) {
            throw new RuntimeException("KABOOM!");
        }

        @Override
        public void close() {
            throw new RuntimeException("KABOOM!");
        }
    };
    private final MockSourceNode<Integer, Integer> timeoutSource = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer){

        @Override
        public void process(Record<Integer, Integer> record) {
            throw new TimeoutException("Kaboom!");
        }
    };
    private final MockProcessorNode<Integer, Integer, ?, ?> processorStreamTime = new MockProcessorNode(10L);
    private final MockProcessorNode<Integer, Integer, ?, ?> processorSystemTime = new MockProcessorNode(10L, PunctuationType.WALL_CLOCK_TIME);
    private final String storeName = "store";
    private final MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
    private final TopicPartition changelogPartition = new TopicPartition("store-changelog", 1);
    private final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST);
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private MockTime time = new MockTime();
    private Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG), (Time)this.time);
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(this.metrics);
    private StateDirectory stateDirectory;
    private StreamTask task;
    private long punctuatedAt;
    @Mock
    private ProcessorStateManager stateManager;
    @Mock
    private RecordCollector recordCollector;
    @Mock
    private ThreadCache cache;
    private final Punctuator punctuator = new Punctuator(){

        public void punctuate(long timestamp) {
            StreamTaskTest.this.punctuatedAt = timestamp;
        }
    };

    private static ProcessorTopology withRepartitionTopics(List<ProcessorNode<?, ?, ?, ?>> processorNodes, Map<String, SourceNode<?, ?>> sourcesByTopic, Set<String> repartitionTopics) {
        return new ProcessorTopology(processorNodes, sourcesByTopic, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), repartitionTopics, Collections.emptyMap());
    }

    private static ProcessorTopology withSources(List<ProcessorNode<?, ?, ?, ?>> processorNodes, Map<String, SourceNode<?, ?>> sourcesByTopic) {
        return new ProcessorTopology(processorNodes, sourcesByTopic, Collections.emptyMap(), Collections.emptyList(), Collections.emptyList(), Collections.emptyMap(), Collections.emptySet(), Collections.emptyMap());
    }

    private static StreamsConfig createConfig() {
        return StreamTaskTest.createConfig("0");
    }

    private static StreamsConfig createConfig(String enforcedProcessingValue) {
        return StreamTaskTest.createConfig("at_least_once", enforcedProcessingValue);
    }

    private static StreamsConfig createConfig(String eosConfig, String enforcedProcessingValue) {
        return StreamTaskTest.createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName());
    }

    private static StreamsConfig createConfig(String eosConfig, String enforcedProcessingValue, String deserializationExceptionHandler) {
        String canonicalPath;
        try {
            canonicalPath = BASE_DIR.getCanonicalPath();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        return new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)APPLICATION_ID), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"state.dir", (Object)canonicalPath), Utils.mkEntry((Object)"metrics.recording.level", (Object)Sensor.RecordingLevel.DEBUG.name), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName()), Utils.mkEntry((Object)"processing.guarantee", (Object)eosConfig), Utils.mkEntry((Object)"max.task.idle.ms", (Object)enforcedProcessingValue), Utils.mkEntry((Object)"default.deserialization.exception.handler", (Object)deserializationExceptionHandler)})));
    }

    @Before
    public void setup() {
        Mockito.when((Object)this.stateManager.taskId()).thenReturn((Object)this.taskId);
        Mockito.when((Object)this.stateManager.taskType()).thenReturn((Object)Task.TaskType.ACTIVE);
        this.consumer.assign(Arrays.asList(this.partition1, this.partition2));
        this.consumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)0L), Utils.mkEntry((Object)this.partition2, (Object)0L)}));
        this.stateDirectory = new StateDirectory(StreamTaskTest.createConfig("100"), (Time)new MockTime(), true, false);
        this.stateDirectory.initializeProcessId();
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null) {
            try {
                this.task.suspend();
            }
            catch (IllegalStateException maybeSwallow) {
                if (!maybeSwallow.getMessage().startsWith("Illegal state CLOSED")) {
                    throw maybeSwallow;
                }
            }
            catch (RuntimeException runtimeException) {
                // empty catch block
            }
            this.task.closeDirty();
            this.task = null;
        }
        this.stateDirectory.close();
        Utils.delete((File)BASE_DIR);
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        Mockito.when((Object)this.stateDirectory.lock(this.taskId)).thenReturn((Object)false);
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), false);
        Assert.assertThrows(LockException.class, () -> this.task.initializeIfNeeded());
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        ((StateDirectory)Mockito.verify((Object)this.stateDirectory, (VerificationMode)Mockito.never())).lock((TaskId)Mockito.any());
    }

    @Test
    public void shouldAttemptToDeleteStateDirectoryWhenCloseDirtyAndEosEnabled() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        Mockito.when((Object)this.stateDirectory.lock(this.taskId)).thenReturn((Object)true);
        Mockito.when((Object)this.stateManager.baseDir()).thenReturn((Object)TestUtils.tempDirectory((String)"state_store"));
        InOrder inOrder = Mockito.inOrder((Object[])new Object[]{this.stateManager, this.stateDirectory});
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("exactly_once_v2", "100"), true, this.stateManager);
        this.task.suspend();
        this.task.closeDirty();
        this.task = null;
        ((ProcessorStateManager)inOrder.verify((Object)this.stateManager)).taskType();
        ((ProcessorStateManager)inOrder.verify((Object)this.stateManager)).registerGlobalStateStores(Collections.emptyList());
        ((ProcessorStateManager)inOrder.verify((Object)this.stateManager)).taskId();
        ((StateDirectory)inOrder.verify((Object)this.stateDirectory)).lock(this.taskId);
        ((ProcessorStateManager)inOrder.verify((Object)this.stateManager)).close();
        ((ProcessorStateManager)inOrder.verify((Object)this.stateManager)).baseDir();
        ((StateDirectory)inOrder.verify((Object)this.stateDirectory)).unlock(this.taskId);
    }

    @Test
    public void shouldResetOffsetsToLastCommittedForSpecifiedPartitions() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.addPartitionsForOffsetReset(Collections.singleton(this.partition1));
        this.consumer.seek(this.partition1, 5L);
        this.consumer.commitSync();
        this.consumer.seek(this.partition1, 10L);
        this.consumer.seek(this.partition2, 15L);
        java.util.function.Consumer resetter = (java.util.function.Consumer)Mockito.mock(java.util.function.Consumer.class);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(resetter);
        MatcherAssert.assertThat((Object)this.consumer.position(this.partition1), (Matcher)CoreMatchers.equalTo((Object)5L));
        MatcherAssert.assertThat((Object)this.consumer.position(this.partition2), (Matcher)CoreMatchers.equalTo((Object)15L));
        ((java.util.function.Consumer)Mockito.verify((Object)resetter)).accept(Collections.emptySet());
    }

    @Test
    public void shouldAutoOffsetResetIfNoCommittedOffsetFound() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.addPartitionsForOffsetReset(Collections.singleton(this.partition1));
        final AtomicReference<AssertionError> shouldNotSeek = new AtomicReference<AssertionError>();
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public void seek(TopicPartition partition, long offset) {
                AssertionError error = (AssertionError)shouldNotSeek.get();
                if (error != null) {
                    throw error;
                }
                super.seek(partition, offset);
            }
        };
        consumer.assign(Arrays.asList(this.partition1, this.partition2));
        consumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)0L), Utils.mkEntry((Object)this.partition2, (Object)0L)}));
        consumer.seek(this.partition1, 5L);
        consumer.seek(this.partition2, 15L);
        shouldNotSeek.set(new AssertionError((Object)"Should not seek"));
        HashSet partitionsAtCall = new HashSet();
        this.task.initializeIfNeeded();
        this.task.completeRestoration(partitionsAtCall::addAll);
        MatcherAssert.assertThat((Object)consumer.position(this.partition1), (Matcher)CoreMatchers.equalTo((Object)5L));
        MatcherAssert.assertThat((Object)consumer.position(this.partition2), (Matcher)CoreMatchers.equalTo((Object)15L));
        MatcherAssert.assertThat(partitionsAtCall, (Matcher)CoreMatchers.equalTo(Collections.singleton(this.partition1)));
    }

    @Test
    public void shouldReadCommittedStreamTimeAndProcessorMetadataOnInitialize() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        ProcessorMetadata processorMetadata = new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)1L), Utils.mkEntry((Object)"key2", (Object)2L)}));
        this.consumer.commitSync(this.partitions.stream().collect(Collectors.toMap(Function.identity(), tp -> new OffsetAndMetadata(0L, new TopicPartitionMetadata(10L, processorMetadata).encode()))));
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        Assert.assertEquals((long)-1L, (long)this.task.streamTime());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertEquals((long)10L, (long)this.task.streamTime());
        Assert.assertEquals((long)1L, (long)this.task.processorContext().processorMetadataForKey("key1"));
        Assert.assertEquals((long)2L, (long)this.task.processorContext().processorMetadataForKey("key2"));
    }

    @Test
    public void shouldReadCommittedStreamTimeAndMergeProcessorMetadataOnInitialize() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        ProcessorMetadata processorMetadata1 = new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)1L), Utils.mkEntry((Object)"key2", (Object)2L)}));
        Map meta1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(0L, new TopicPartitionMetadata(10L, processorMetadata1).encode()))});
        ProcessorMetadata processorMetadata2 = new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)10L), Utils.mkEntry((Object)"key3", (Object)30L)}));
        Map meta2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition2, (Object)new OffsetAndMetadata(0L, new TopicPartitionMetadata(20L, processorMetadata2).encode()))});
        this.consumer.commitSync(meta1);
        this.consumer.commitSync(meta2);
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        Assert.assertEquals((long)-1L, (long)this.task.streamTime());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertEquals((long)20L, (long)this.task.streamTime());
        Assert.assertEquals((long)10L, (long)this.task.processorContext().processorMetadataForKey("key1"));
        Assert.assertEquals((long)2L, (long)this.task.processorContext().processorMetadataForKey("key2"));
        Assert.assertEquals((long)30L, (long)this.task.processorContext().processorMetadataForKey("key3"));
    }

    @Test
    public void shouldTransitToRestoringThenRunningAfterCreation() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        Mockito.when((Object)this.stateDirectory.lock(this.taskId)).thenReturn((Object)true);
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.changelogPartition, 10L));
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        Assert.assertEquals((Object)Task.State.CREATED, (Object)this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals((Object)Task.State.RESTORING, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source2.initialized);
        this.task.initializeIfNeeded();
        Assert.assertEquals((Object)Task.State.RESTORING, (Object)this.task.state());
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        Assert.assertTrue((boolean)this.source1.initialized);
        Assert.assertTrue((boolean)this.source2.initialized);
    }

    @Test
    public void shouldProcessInOrder() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L, 101), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L, 102), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L, 103)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 25L, 201), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L, 202), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L, 203)));
        this.task.updateLags();
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals(Collections.singletonList(101), this.source1.values);
        Assert.assertEquals(Collections.emptyList(), this.source2.values);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102), this.source1.values);
        Assert.assertEquals(Collections.emptyList(), this.source2.values);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102), this.source1.values);
        Assert.assertEquals(Collections.singletonList(201), this.source2.values);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102, 103), this.source1.values);
        Assert.assertEquals(Collections.singletonList(201), this.source2.values);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102, 103), this.source1.values);
        Assert.assertEquals(Arrays.asList(201, 202), this.source2.values);
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertEquals(Arrays.asList(101, 102, 103), this.source1.values);
        Assert.assertEquals(Arrays.asList(201, 202, 203), this.source2.values);
    }

    @Test
    public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig(), "latest");
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        this.task.postCommit(false);
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
    }

    @Test
    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosAlphaEnabled() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("exactly_once", "0"), "latest");
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        this.task.prepareCommit();
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        this.task.postCommit(false);
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
    }

    @Test
    public void shouldNotProcessRecordsAfterPrepareCommitWhenEosV2Enabled() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("exactly_once_v2", "0"), "latest");
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        this.task.prepareCommit();
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
        this.task.postCommit(false);
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertTrue((boolean)this.task.process(this.time.milliseconds()));
        Assert.assertFalse((boolean)this.task.process(this.time.milliseconds()));
    }

    @Test
    public void shouldRecordBufferedRecords() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("at_least_once", "0"), "latest");
        KafkaMetric metric = this.getMetric("active-buffer", "%s-count", this.task.id().toString());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L)));
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)2.0));
        Assert.assertTrue((boolean)this.task.process(0L));
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)1.0));
    }

    @Test
    public void shouldRecordProcessRatio() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        KafkaMetric metric = this.getMetric("active-process", "%s-ratio", this.task.id().toString());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessBatchTime(15L);
        this.task.recordProcessTimeRatioAndBufferSize(100L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.25));
        this.task.recordProcessBatchTime(10L);
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.25));
        this.task.recordProcessBatchTime(10L);
        this.task.recordProcessTimeRatioAndBufferSize(20L, this.time.milliseconds());
        MatcherAssert.assertThat((Object)metric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)1.0));
    }

    @Test
    public void shouldRecordE2ELatencyOnSourceNodeAndTerminalNodes() {
        this.time = new MockTime(0L, 0L, 0L);
        this.metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), (Time)this.time);
        MockSourceNode<Integer, Integer> evenKeyForwardingSourceNode = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer){
            InternalProcessorContext<Integer, Integer> context;

            @Override
            public void init(InternalProcessorContext<Integer, Integer> context) {
                this.context = context;
                super.init(context);
            }

            @Override
            public void process(Record<Integer, Integer> record) {
                if ((Integer)record.key() % 2 == 0) {
                    this.context.forward(record);
                }
            }
        };
        this.task = this.createStatelessTaskWithForwardingTopology((SourceNode<Integer, Integer>)evenKeyForwardingSourceNode);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        String sourceNodeName = evenKeyForwardingSourceNode.name();
        String terminalNodeName = this.processorStreamTime.name();
        Metric sourceAvg = this.getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), sourceNodeName, "latest");
        Metric sourceMin = this.getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), sourceNodeName, "latest");
        Metric sourceMax = this.getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), sourceNodeName, "latest");
        Metric terminalAvg = this.getProcessorMetric("record-e2e-latency", "%s-avg", this.task.id().toString(), terminalNodeName, "latest");
        Metric terminalMin = this.getProcessorMetric("record-e2e-latency", "%s-min", this.task.id().toString(), terminalNodeName, "latest");
        Metric terminalMax = this.getProcessorMetric("record-e2e-latency", "%s-max", this.task.id().toString(), terminalNodeName, "latest");
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
        this.task.process(10L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(1, 0L)));
        this.task.process(15L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)12.5));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)15.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(2, 0L)));
        this.task.process(23L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)16.0));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)16.5));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(3, 0L)));
        this.task.process(5L);
        MatcherAssert.assertThat((Object)sourceAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)13.25));
        MatcherAssert.assertThat((Object)sourceMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)5.0));
        MatcherAssert.assertThat((Object)sourceMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
        MatcherAssert.assertThat((Object)terminalAvg.metricValue(), (Matcher)CoreMatchers.equalTo((Object)16.5));
        MatcherAssert.assertThat((Object)terminalMin.metricValue(), (Matcher)CoreMatchers.equalTo((Object)10.0));
        MatcherAssert.assertThat((Object)terminalMax.metricValue(), (Matcher)CoreMatchers.equalTo((Object)23.0));
    }

    @Test
    public void shouldRecordRestoredRecords() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("at_least_once", "0"), "latest");
        KafkaMetric totalMetric = this.getMetric("restore", "%s-total", this.task.id().toString());
        KafkaMetric rateMetric = this.getMetric("restore", "%s-rate", this.task.id().toString());
        KafkaMetric remainMetric = this.getMetric("restore", "%s-remaining-records-total", this.task.id().toString());
        MatcherAssert.assertThat((Object)totalMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        MatcherAssert.assertThat((Object)rateMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        MatcherAssert.assertThat((Object)remainMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)0.0));
        this.task.recordRestoration((Time)this.time, 100L, true);
        MatcherAssert.assertThat((Object)remainMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)100.0));
        this.task.recordRestoration((Time)this.time, 25L, false);
        MatcherAssert.assertThat((Object)totalMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)25.0));
        MatcherAssert.assertThat((Object)rateMetric.metricValue(), (Matcher)Matchers.not((Object)0.0));
        MatcherAssert.assertThat((Object)remainMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)75.0));
        this.task.recordRestoration((Time)this.time, 50L, false);
        MatcherAssert.assertThat((Object)totalMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)75.0));
        MatcherAssert.assertThat((Object)rateMetric.metricValue(), (Matcher)Matchers.not((Object)0.0));
        MatcherAssert.assertThat((Object)remainMetric.metricValue(), (Matcher)CoreMatchers.equalTo((Object)25.0));
    }

    @Test
    public void shouldThrowOnTimeoutExceptionAndBufferRecordForRetryIfEosDisabled() {
        this.createTimeoutTask("at_least_once");
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
        TimeoutException exception = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.task.process(0L));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Kaboom!"));
        MatcherAssert.assertThat((Object)this.task.commitNeeded(), (Matcher)CoreMatchers.equalTo((Object)false));
        MatcherAssert.assertThat((Object)this.task.hasRecordsQueued(), (Matcher)CoreMatchers.equalTo((Object)false));
        TimeoutException nextException = (TimeoutException)Assert.assertThrows(TimeoutException.class, () -> this.task.process(0L));
        MatcherAssert.assertThat((Object)nextException.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Kaboom!"));
    }

    @Test
    public void shouldThrowTaskCorruptedExceptionOnTimeoutExceptionIfEosEnabled() {
        this.createTimeoutTask("exactly_once_v2");
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(0, 0L)));
        Assert.assertThrows(TaskCorruptedException.class, () -> this.task.process(0L));
    }

    @Test
    public void testMetrics() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        Assert.assertNotNull((Object)this.getMetric("enforced-processing", "%s-rate", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("enforced-processing", "%s-total", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("record-lateness", "%s-avg", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("record-lateness", "%s-max", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("active-process", "%s-ratio", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("active-buffer", "%s-count", this.task.id().toString()));
        this.testMetricsForBuiltInMetricsVersionLatest();
        JmxReporter reporter = new JmxReporter();
        KafkaMetricsContext metricsContext = new KafkaMetricsContext("kafka.streams");
        reporter.contextChange((MetricsContext)metricsContext);
        this.metrics.addReporter((MetricsReporter)reporter);
        String threadIdTag = "thread-id";
        Assert.assertTrue((boolean)reporter.containsMbean(String.format("kafka.streams:type=stream-task-metrics,%s=%s,task-id=%s", "thread-id", this.threadId, this.task.id())));
    }

    private void testMetricsForBuiltInMetricsVersionLatest() {
        String builtInMetricsVersion = "latest";
        Assert.assertNull((Object)this.getMetric("commit", "%s-latency-avg", "all"));
        Assert.assertNull((Object)this.getMetric("commit", "%s-latency-max", "all"));
        Assert.assertNull((Object)this.getMetric("commit", "%s-rate", "all"));
        Assert.assertNull((Object)this.getMetric("commit", "%s-total", "all"));
        Assert.assertNotNull((Object)this.getMetric("process", "%s-latency-max", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("process", "%s-latency-avg", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-latency-avg", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-latency-max", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-rate", this.task.id().toString()));
        Assert.assertNotNull((Object)this.getMetric("punctuate", "%s-total", this.task.id().toString()));
    }

    private KafkaMetric getMetric(String operation, String nameFormat, String taskId) {
        String descriptionIsNotVerified = "";
        return (KafkaMetric)this.metrics.metrics().get(this.metrics.metricName(String.format(nameFormat, operation), "stream-task-metrics", "", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)taskId), Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName())})));
    }

    private Metric getProcessorMetric(String operation, String nameFormat, String taskId, String processorNodeId, String builtInMetricsVersion) {
        return StreamsTestUtils.getMetricByNameFilterByTags(this.metrics.metrics(), String.format(nameFormat, operation), "stream-processor-node-metrics", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"task-id", (Object)taskId), Utils.mkEntry((Object)"processor-node-id", (Object)processorNodeId), Utils.mkEntry((Object)"thread-id", (Object)Thread.currentThread().getName())}));
    }

    @Test
    public void shouldPauseAndResumeBasedOnBufferedRecords() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 55L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 65L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 40L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 50L)));
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)2L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition1));
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.resumePollingForPartitionsWithAvailableSpace();
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertEquals((long)1L, (long)this.consumer.paused().size());
        Assert.assertTrue((boolean)this.consumer.paused().contains(this.partition2));
        this.task.resumePollingForPartitionsWithAvailableSpace();
        Assert.assertEquals((long)0L, (long)this.consumer.paused().size());
    }

    @Test
    public void shouldPunctuateOnceStreamTimeAfterGap() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 142L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 155L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 160L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 25L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 145L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 159L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 161L)));
        this.task.updateLags();
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)7L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)0L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.canPunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)6L, (long)this.task.numBuffered());
        Assert.assertEquals((long)1L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)5L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)1L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.canPunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)4L, (long)this.task.numBuffered());
        Assert.assertEquals((long)2L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)3L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)2L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.canPunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)2L, (long)this.task.numBuffered());
        Assert.assertEquals((long)3L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)1L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)3L, (long)this.source2.numReceived);
        Assert.assertTrue((boolean)this.task.canPunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertEquals((long)0L, (long)this.task.numBuffered());
        Assert.assertEquals((long)4L, (long)this.source1.numReceived);
        Assert.assertEquals((long)4L, (long)this.source2.numReceived);
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L, 142L, 155L, 160L);
    }

    @Test
    public void shouldRespectPunctuateCancellationStreamTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 40L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 25L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L)));
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.canPunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.process(0L));
        this.processorStreamTime.mockProcessor.scheduleCancellable().cancel();
        Assert.assertFalse((boolean)this.task.canPunctuateStreamTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateStreamTime());
        this.processorStreamTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.STREAM_TIME, 20L);
    }

    @Test
    public void shouldRespectPunctuateCancellationSystemTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.scheduleCancellable().cancel();
        this.time.sleep(10L);
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L);
    }

    @Test
    public void shouldRespectCommitNeeded() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertFalse((boolean)this.task.commitNeeded());
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse((boolean)this.task.commitNeeded());
        Assert.assertTrue((boolean)this.task.canPunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateStreamTime());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse((boolean)this.task.commitNeeded());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.postCommit(true);
        Assert.assertFalse((boolean)this.task.commitNeeded());
    }

    @Test
    public void shouldCommitNextOffsetAndProcessorMetadataFromQueueIfAvailable() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 3L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 5L)));
        this.task.process(0L);
        this.processorStreamTime.mockProcessor.addProcessorMetadata("key1", 100L);
        this.task.process(0L);
        this.processorSystemTime.mockProcessor.addProcessorMetadata("key2", 200L);
        Map offsetsAndMetadata = this.task.prepareCommit();
        TopicPartitionMetadata expected = new TopicPartitionMetadata(3L, new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)100L), Utils.mkEntry((Object)"key2", (Object)200L)})));
        MatcherAssert.assertThat((Object)offsetsAndMetadata, (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(5L, expected.encode()))})));
    }

    @Test
    public void shouldCommitConsumerPositionIfRecordQueueIsEmpty() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L));
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 1L));
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 2L));
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        this.task.addRecords(this.partition2, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L)));
        this.task.process(0L);
        TopicPartitionMetadata metadata = new TopicPartitionMetadata(0L, new ProcessorMetadata());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(3L, metadata.encode()))})));
        this.task.postCommit(false);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.consumer.poll(Duration.ZERO);
        this.task.updateLags();
        this.task.process(0L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(3L, metadata.encode())), Utils.mkEntry((Object)this.partition2, (Object)new OffsetAndMetadata(1L, metadata.encode()))})));
        this.task.postCommit(false);
        Assert.assertFalse((boolean)this.task.commitNeeded());
    }

    @Test
    public void shouldCommitOldProcessorMetadataWhenNotDirty() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L));
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 1L));
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L));
        this.consumer.addRecord(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 1L));
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 1L)));
        this.task.updateLags();
        this.task.process(0L);
        this.processorStreamTime.mockProcessor.addProcessorMetadata("key1", 100L);
        TopicPartitionMetadata expectedMetadata1 = new TopicPartitionMetadata(0L, new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)100L)})));
        TopicPartitionMetadata expectedMetadata2 = new TopicPartitionMetadata(-1L, new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)100L)})));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(1L, expectedMetadata1.encode())), Utils.mkEntry((Object)this.partition2, (Object)new OffsetAndMetadata(2L, expectedMetadata2.encode()))})));
        this.task.postCommit(false);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.consumer.poll(Duration.ZERO);
        this.task.process(0L);
        TopicPartitionMetadata expectedMetadata3 = new TopicPartitionMetadata(1L, new ProcessorMetadata(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"key1", (Object)100L)})));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(2L, expectedMetadata3.encode()))})));
        this.task.postCommit(false);
        Assert.assertFalse((boolean)this.task.commitNeeded());
    }

    @Test
    public void shouldFailOnCommitIfTaskIsClosed() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.suspend();
        this.task.transitionTo(Task.State.CLOSED);
        IllegalStateException thrown = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> ((StreamTask)this.task).prepareCommit());
        MatcherAssert.assertThat((Object)thrown.getMessage(), (Matcher)Matchers.is((Object)"Illegal state CLOSED while preparing active task 0_0 for committing"));
    }

    @Test
    public void shouldRespectCommitRequested() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.requestCommit();
        Assert.assertTrue((boolean)this.task.commitRequested());
    }

    @Test
    public void shouldBeProcessableIfAllPartitionsBuffered() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        MatcherAssert.assertThat((String)"task is not idling", (!this.task.timeCurrentIdlingStarted().isPresent() ? 1 : 0) != 0);
        Assert.assertFalse((boolean)this.task.process(0L));
        byte[] bytes = ByteBuffer.allocate(4).putInt(1).array();
        this.task.addRecords(this.partition1, Collections.singleton(new ConsumerRecord("topic1", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertFalse((boolean)this.task.process(0L));
        MatcherAssert.assertThat((String)"task is idling", (boolean)this.task.timeCurrentIdlingStarted().isPresent());
        this.task.addRecords(this.partition2, Collections.singleton(new ConsumerRecord("topic2", 1, 0L, (Object)bytes, (Object)bytes)));
        Assert.assertTrue((boolean)this.task.process(0L));
        MatcherAssert.assertThat((String)"task is not idling", (!this.task.timeCurrentIdlingStarted().isPresent() ? 1 : 0) != 0);
    }

    @Test
    public void shouldBeRecordIdlingTimeIfSuspended() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.suspend();
        MatcherAssert.assertThat((String)"task is idling", (boolean)this.task.timeCurrentIdlingStarted().isPresent());
        this.task.resume();
        MatcherAssert.assertThat((String)"task is not idling", (!this.task.timeCurrentIdlingStarted().isPresent() ? 1 : 0) != 0);
    }

    @Test
    public void shouldPunctuateSystemTimeWhenIntervalElapsed() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        long now = this.time.milliseconds();
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(20L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 10L, now + 20L, now + 30L, now + 50L);
    }

    @Test
    public void shouldNotPunctuateSystemTimeWhenIntervalNotElapsed() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(9L);
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, new long[0]);
    }

    @Test
    public void shouldPunctuateOnceSystemTimeAfterGap() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        long now = this.time.milliseconds();
        this.time.sleep(100L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(10L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(12L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(7L);
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(1L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(105L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.time.sleep(5L);
        Assert.assertTrue((boolean)this.task.canPunctuateSystemTime());
        Assert.assertTrue((boolean)this.task.maybePunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.canPunctuateSystemTime());
        Assert.assertFalse((boolean)this.task.maybePunctuateSystemTime());
        this.processorSystemTime.mockProcessor.checkAndClearPunctuateResult(PunctuationType.WALL_CLOCK_TIME, now + 100L, now + 110L, now + 122L, now + 130L, now + 235L, now + 240L);
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingStreamTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        try {
            this.task.punctuate(this.processorStreamTime, 1L, PunctuationType.STREAM_TIME, timestamp -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorStreamTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext().currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldWrapKafkaExceptionsWithStreamsExceptionAndAddContextWhenPunctuatingWallClockTimeTime() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        try {
            this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
                throw new KafkaException("KABOOM!");
            });
            Assert.fail((String)"Should've thrown StreamsException");
        }
        catch (StreamsException e) {
            String message = e.getMessage();
            Assert.assertTrue((String)("message=" + message + " should contain processor"), (boolean)message.contains("processor '" + this.processorSystemTime.name() + "'"));
            MatcherAssert.assertThat((Object)this.task.processorContext().currentNode(), (Matcher)CoreMatchers.nullValue());
        }
    }

    @Test
    public void shouldNotShareHeadersBetweenPunctuateIterations() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, timestamp -> this.task.processorContext().headers().add("dummy", null));
        this.task.punctuate(this.processorSystemTime, 1L, PunctuationType.WALL_CLOCK_TIME, timestamp -> Assert.assertFalse((boolean)this.task.processorContext().headers().iterator().hasNext()));
    }

    @Test
    public void shouldWrapKafkaExceptionWithStreamsExceptionWhenProcess() {
        this.task = this.createFaultyStatefulTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 20L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 30L)));
        this.task.addRecords(this.partition2, Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 5L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 35L), this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 45L)));
        MatcherAssert.assertThat((String)"Map did not contain the partitions", (this.task.highWaterMark().containsKey(this.partition1) && this.task.highWaterMark().containsKey(this.partition2) ? 1 : 0) != 0);
        Assert.assertThrows(StreamsException.class, () -> this.task.process(0L));
    }

    @Test
    public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        Mockito.when((Object)this.stateDirectory.lock(this.taskId)).thenReturn((Object)true);
        this.task = this.createDisconnectedTask(StreamTaskTest.createConfig("100"));
        this.task.transitionTo(Task.State.RESTORING);
        Assert.assertThrows(TimeoutException.class, () -> this.task.completeRestoration(noOpResetter -> {}));
    }

    @Test
    public void shouldReInitializeTopologyWhenResuming() {
        this.stateDirectory.close();
        this.stateDirectory = (StateDirectory)Mockito.mock(StateDirectory.class);
        Mockito.when((Object)this.stateDirectory.lock(this.taskId)).thenReturn((Object)true);
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assert.assertEquals((Object)Task.State.SUSPENDED, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source2.initialized);
        this.task.resume();
        Assert.assertEquals((Object)Task.State.RESTORING, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source2.initialized);
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        Assert.assertTrue((boolean)this.source1.initialized);
        Assert.assertTrue((boolean)this.source2.initialized);
        MatcherAssert.assertThat((String)"Map did not contain the partition", (boolean)this.task.highWaterMark().containsKey(this.partition1));
        ((RecordCollector)Mockito.verify((Object)this.recordCollector)).offsets();
    }

    @Test
    public void shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
        Long offset = 543L;
        Mockito.when((Object)this.recordCollector.offsets()).thenReturn(Collections.singletonMap(this.changelogPartition, offset));
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.changelogPartition, 10L)).thenReturn(Collections.singletonMap(this.changelogPartition, 10L)).thenReturn(Collections.singletonMap(this.changelogPartition, 20L));
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        MatcherAssert.assertThat((String)"Map was empty", (this.task.highWaterMark().size() == 2 ? 1 : 0) != 0);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)2))).checkpoint();
    }

    @Test
    public void shouldCheckpointOffsetsOnCommitIfSnapshotMuchChanged() {
        Long offset = 543L;
        Mockito.when((Object)this.recordCollector.offsets()).thenReturn(Collections.singletonMap(this.changelogPartition, offset));
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.changelogPartition, 0L)).thenReturn(Collections.singletonMap(this.changelogPartition, 10L)).thenReturn(Collections.singletonMap(this.changelogPartition, 12000L));
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.prepareCommit();
        this.task.postCommit(false);
        MatcherAssert.assertThat((String)"Map was empty", (this.task.highWaterMark().size() == 2 ? 1 : 0) != 0);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)3))).checkpoint();
    }

    @Test
    public void shouldNotCheckpointOffsetsOnCommitIfEosIsEnabled() {
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("exactly_once_v2", "100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.prepareCommit();
        this.task.postCommit(false);
        File checkpointFile = new File(this.stateDirectory.getOrCreateDirectoryForTask(this.taskId), ".checkpoint");
        Assert.assertFalse((boolean)checkpointFile.exists());
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        try {
            this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
            Assert.fail((String)"Should throw illegal state exception as current node is not null");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void shouldCallPunctuateOnPassedInProcessorNode() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)5L));
        this.task.punctuate(this.processorStreamTime, 10L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.punctuatedAt, (Matcher)CoreMatchers.equalTo((Object)10L));
    }

    @Test
    public void shouldSetProcessorNodeOnContextBackToNullAfterSuccessfulPunctuate() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.punctuate(this.processorStreamTime, 5L, PunctuationType.STREAM_TIME, this.punctuator);
        MatcherAssert.assertThat((Object)this.task.processorContext().currentNode(), (Matcher)CoreMatchers.nullValue());
    }

    @Test
    public void shouldThrowIllegalStateExceptionOnScheduleIfCurrentNodeIsNull() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        Assert.assertThrows(IllegalStateException.class, () -> this.task.schedule(1L, PunctuationType.STREAM_TIME, timestamp -> {}));
    }

    @Test
    public void shouldNotThrowExceptionOnScheduleIfCurrentNodeIsNotNull() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.processorContext().setCurrentNode(this.processorStreamTime);
        this.task.schedule(1L, PunctuationType.STREAM_TIME, timestamp -> {});
    }

    @Test
    public void shouldCloseStateManagerEvenDuringFailureOnUncleanTaskClose() {
        this.task = this.createFaultyStatefulTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertThrows(RuntimeException.class, () -> this.task.suspend());
        this.task.closeDirty();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).close();
    }

    @Test
    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
        TopicPartition repartition = new TopicPartition("repartition", 1);
        ProcessorTopology topology = StreamTaskTest.withRepartitionTopics(Arrays.asList(new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)repartition.topic(), this.source2)}), Collections.singleton(repartition.topic()));
        this.consumer.assign(Arrays.asList(this.partition1, repartition));
        this.consumer.updateBeginningOffsets(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)repartition, (Object)0L)}));
        StreamsConfig config = StreamTaskTest.createConfig();
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        this.task = new StreamTask(this.taskId, Utils.mkSet((Object[])new TopicPartition[]{this.partition1, repartition}), topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 5L)));
        this.task.addRecords(repartition, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(repartition, 10L)));
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.task.updateLags();
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.process(0L));
        this.task.prepareCommit();
        Map map = this.task.purgeableOffsets();
        MatcherAssert.assertThat((Object)map, (Matcher)CoreMatchers.equalTo(Collections.singletonMap(repartition, 11L)));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenFetchCommittedFailed() {
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
                throw new KafkaException("KABOOM!");
            }
        };
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)consumer);
        this.task.transitionTo(Task.State.RESTORING);
        Assert.assertThrows(StreamsException.class, () -> this.task.completeRestoration(noOpResetter -> {}));
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        Assert.assertThrows(IllegalStateException.class, () -> ((StreamTask)this.task).prepareCommit());
    }

    @Test
    public void shouldThrowIfPostCommittingOnIllegalState() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("100"));
        this.task.transitionTo(Task.State.SUSPENDED);
        this.task.transitionTo(Task.State.CLOSED);
        Assert.assertThrows(IllegalStateException.class, () -> this.task.postCommit(true));
    }

    @Test
    public void shouldSkipCheckpointingSuspendedCreatedTask() {
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.suspend();
        this.task.postCommit(true);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).checkpoint();
    }

    @Test
    public void shouldCheckpointForSuspendedTask() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 1L));
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.postCommit(true);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldNotCheckpointForSuspendedRunningTaskWithSmallProgress() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 0L)).thenReturn(Collections.singletonMap(this.partition1, 1L)).thenReturn(Collections.singletonMap(this.partition1, 2L));
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldCheckpointForSuspendedRunningTaskWithLargeProgress() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 0L)).thenReturn(Collections.singletonMap(this.partition1, 12000L)).thenReturn(Collections.singletonMap(this.partition1, 24000L));
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.suspend();
        this.task.postCommit(false);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)3))).checkpoint();
    }

    @Test
    public void shouldCheckpointWhileUpdateSnapshotWithTheConsumedOffsetsForSuspendedRunningTask() {
        Map<TopicPartition, Long> checkpointableOffsets = Collections.singletonMap(this.partition1, 1L);
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.emptyMap()).thenReturn(checkpointableOffsets);
        Mockito.when((Object)this.recordCollector.offsets()).thenReturn(checkpointableOffsets);
        this.task = this.createStatefulTask(StreamTaskTest.createConfig(), true);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singleton(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 10L)));
        this.task.addRecords(this.partition2, Collections.singleton(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 10L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.postCommit(true);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)2))).checkpoint();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)2))).updateChangelogOffsets(checkpointableOffsets);
        ((RecordCollector)Mockito.verify((Object)this.recordCollector, (VerificationMode)Mockito.times((int)2))).offsets();
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 50L));
        Mockito.when((Object)this.stateManager.changelogPartitions()).thenReturn(Collections.singleton(this.partition1));
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        Assert.assertEquals(Collections.singletonMap(this.partition1, 50L), (Object)this.task.changelogOffsets());
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertEquals(Collections.singletonMap(this.partition1, -2L), (Object)this.task.changelogOffsets());
    }

    @Test
    public void shouldNotCheckpointOnCloseCreated() {
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        this.task.closeClean();
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        Assert.assertFalse((boolean)this.source1.initialized);
        Assert.assertFalse((boolean)this.source1.closed);
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).checkpoint();
    }

    @Test
    public void shouldCheckpointOnCloseRestoringIfNoProgress() {
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)2))).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)2))).checkpoint();
    }

    @Test
    public void shouldAlwaysCheckpointStateIfEnforced() {
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.maybeCheckpoint(true);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldOnlyCheckpointStateWithBigAdvanceIfNotEnforced() {
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 50L)).thenReturn(Collections.singletonMap(this.partition1, 11000L)).thenReturn(Collections.singletonMap(this.partition1, 12000L));
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.maybeCheckpoint(false);
        Assert.assertTrue((boolean)this.task.offsetSnapshotSinceLastFlush.isEmpty());
        this.task.maybeCheckpoint(false);
        Assert.assertEquals(Collections.singletonMap(this.partition1, 11000L), (Object)this.task.offsetSnapshotSinceLastFlush);
        this.task.maybeCheckpoint(false);
        Assert.assertEquals(Collections.singletonMap(this.partition1, 11000L), (Object)this.task.offsetSnapshotSinceLastFlush);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldCheckpointOffsetsOnPostCommit() {
        long offset = 543L;
        long consumedOffset = 345L;
        Mockito.when((Object)this.recordCollector.offsets()).thenReturn(Collections.singletonMap(this.changelogPartition, 543L));
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 10543L)).thenReturn(Collections.singletonMap(this.partition1, 12543L));
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig(), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 345L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(false);
        Assert.assertEquals((Object)Task.State.SUSPENDED, (Object)this.task.state());
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
    }

    @Test
    public void shouldThrowExceptionOnCloseCleanError() {
        long offset = 543L;
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.changelogPartition, 543L));
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new ProcessorStateException("KABOOM!")}).when((Object)this.stateManager)).close();
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 543L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        Assert.assertThrows(ProcessorStateException.class, () -> this.task.closeClean());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.times((int)2))).checkpoint();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).close();
    }

    @Test
    public void shouldThrowOnCloseCleanFlushError() {
        long offset = 543L;
        Mockito.when((Object)this.recordCollector.offsets()).thenReturn(Collections.singletonMap(this.changelogPartition, 543L));
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new ProcessorStateException("KABOOM!")}).when((Object)this.stateManager)).flushCache();
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 543L)));
        this.task.process(100L);
        Assert.assertThrows(ProcessorStateException.class, () -> ((StreamTask)this.task).prepareCommit());
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).checkpoint();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).close();
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        long offset = 54300L;
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new ProcessorStateException("KABOOM!")}).when((Object)this.stateManager)).checkpoint();
        Mockito.when((Object)this.stateManager.changelogOffsets()).thenReturn(Collections.singletonMap(this.partition1, 54300L));
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 54300L)));
        this.task.process(100L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.suspend();
        this.task.prepareCommit();
        Assert.assertThrows(ProcessorStateException.class, () -> this.task.postCommit(true));
        Assert.assertEquals((Object)Task.State.SUSPENDED, (Object)this.task.state());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager, (VerificationMode)Mockito.never())).close();
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        ((ProcessorStateManager)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("KABOOM!")}).when((Object)this.stateManager)).close();
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assertions.assertDoesNotThrow(() -> this.task.closeDirty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.task.closeClean();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.task.closeDirty();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsAndCloseInPrepareRecycle() {
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.not((Matcher)Matchers.empty()));
        this.task.prepareRecycle();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).recycle();
    }

    @Test
    public void shouldFlushStateManagerAndRecordCollector() {
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), false);
        this.task.flush();
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).flushCache();
        ((RecordCollector)Mockito.verify((Object)this.recordCollector)).flush();
    }

    @Test
    public void shouldClearCommitStatusesInCloseDirty() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        this.task.requestCommit();
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.commitNeeded(), (Matcher)Matchers.is((Object)true));
        MatcherAssert.assertThat((Object)this.task.commitRequested(), (Matcher)Matchers.is((Object)true));
        this.task.closeDirty();
        MatcherAssert.assertThat((Object)this.task.commitNeeded(), (Matcher)Matchers.is((Object)false));
        MatcherAssert.assertThat((Object)this.task.commitRequested(), (Matcher)Matchers.is((Object)false));
    }

    @Test
    public void closeShouldBeIdempotent() {
        this.task = this.createOptimizedStatefulTask(StreamTaskTest.createConfig("100"), (Consumer<byte[], byte[]>)this.consumer);
        this.task.suspend();
        this.task.closeClean();
        this.task.closeClean();
        this.task.closeDirty();
    }

    @Test
    public void shouldUpdatePartitions() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        HashSet<TopicPartition> newPartitions = new HashSet<TopicPartition>(this.task.inputPartitions());
        newPartitions.add(new TopicPartition("newTopic", 0));
        this.task.updateInputPartitions(newPartitions, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.source1.name(), Arrays.asList("topic1", "newTopic")), Utils.mkEntry((Object)this.source2.name(), Collections.singletonList("topic2"))}));
        MatcherAssert.assertThat((Object)this.task.inputPartitions(), (Matcher)CoreMatchers.equalTo(newPartitions));
    }

    @Test
    public void shouldThrowIfCleanClosingDirtyTask() {
        this.task = this.createSingleSourceStateless(StreamTaskTest.createConfig("at_least_once", "0"), "latest");
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        Assert.assertTrue((boolean)this.task.process(0L));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> this.task.closeClean());
    }

    @Test
    public void shouldThrowIfRecyclingDirtyTask() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        this.task.addRecords(this.partition1, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, 0L)));
        this.task.addRecords(this.partition2, Collections.singletonList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition2, 0L)));
        this.task.process(0L);
        Assert.assertTrue((boolean)this.task.commitNeeded());
        Assert.assertThrows(TaskMigratedException.class, () -> this.task.prepareRecycle());
    }

    @Test
    public void shouldPrepareRecycleSuspendedTask() {
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        Assert.assertThrows(IllegalStateException.class, () -> this.task.prepareRecycle());
        this.task.initializeIfNeeded();
        Assert.assertThrows(IllegalStateException.class, () -> this.task.prepareRecycle());
        this.task.completeRestoration(noOpResetter -> {});
        Assert.assertThrows(IllegalStateException.class, () -> this.task.prepareRecycle());
        this.task.suspend();
        this.task.prepareRecycle();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)Matchers.is((Object)Task.State.CLOSED));
        ((ProcessorStateManager)Mockito.verify((Object)this.stateManager)).recycle();
        ((RecordCollector)Mockito.verify((Object)this.recordCollector)).closeClean();
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRestoringTasks() {
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("100"), true);
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.RESTORING));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        this.task = this.createFaultyStatefulTask(StreamTaskTest.createConfig("100"));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> this.task.suspend());
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() {
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, StreamTaskTest.createConfig("100"), this.stateManager, this.streamsMetrics, null);
        StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)this.time);
        ProcessorTopology topology = StreamTaskTest.withSources(Collections.emptyList(), Utils.mkMap((Map.Entry[])new Map.Entry[0]));
        TopologyException exception = (TopologyException)Assert.assertThrows(TopologyException.class, () -> this.lambda$shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic$73(topology, metrics, (InternalProcessorContext)context));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"Invalid topology: Topic topic1 is unknown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order."));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.maybeInitTaskTimeoutOrThrow(0L, null);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), null);
        StreamsException thrown = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null));
        MatcherAssert.assertThat((Object)thrown.getCause(), (Matcher)Matchers.isA(TimeoutException.class));
    }

    @Test
    public void shouldClearTaskTimeout() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig());
        this.task.maybeInitTaskTimeoutOrThrow(0L, null);
        this.task.clearTaskTimeout();
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null);
    }

    @Test
    public void shouldUpdateOffsetIfAllRecordsAreCorrupted() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("at_least_once", "0", LogAndContinueExceptionHandler.class.getName()));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        long offset = -1L;
        List<ConsumerRecord> records = Arrays.asList(this.getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset), this.getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
        this.consumer.addRecord(records.get(0));
        this.consumer.addRecord(records.get(1));
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, records);
        this.task.updateLags();
        Assert.assertTrue((boolean)this.task.process(offset));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(offset + 1L, new TopicPartitionMetadata(-1L, new ProcessorMetadata()).encode()))})));
    }

    @Test
    public void shouldUpdateOffsetIfValidRecordFollowsCorrupted() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("at_least_once", "0", LogAndContinueExceptionHandler.class.getName()));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        long offset = -1L;
        List<ConsumerRecord> records = Arrays.asList(this.getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset), this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, ++offset));
        this.consumer.addRecord(records.get(0));
        this.consumer.addRecord(records.get(1));
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, records);
        this.task.updateLags();
        Assert.assertTrue((boolean)this.task.process(offset));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(offset + 1L, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))})));
    }

    @Test
    public void shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
        this.task = this.createStatelessTask(StreamTaskTest.createConfig("at_least_once", "0", LogAndContinueExceptionHandler.class.getName()));
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        long offset = -1L;
        List<ConsumerRecord> records = Arrays.asList(this.getConsumerRecordWithOffsetAsTimestamp(this.partition1, ++offset), this.getCorruptedConsumerRecordWithOffsetAsTimestamp(++offset));
        this.consumer.addRecord(records.get(0));
        this.consumer.addRecord(records.get(1));
        this.task.resumePollingForPartitionsWithAvailableSpace();
        this.consumer.poll(Duration.ZERO);
        this.task.addRecords(this.partition1, records);
        this.task.updateLags();
        Assert.assertTrue((boolean)this.task.process(offset));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(1L, new TopicPartitionMetadata(0L, new ProcessorMetadata()).encode()))})));
        Assert.assertTrue((boolean)this.task.process(offset));
        Assert.assertTrue((boolean)this.task.commitNeeded());
        MatcherAssert.assertThat((Object)this.task.prepareCommit(), (Matcher)CoreMatchers.equalTo((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)new OffsetAndMetadata(2L, new TopicPartitionMetadata(0L, new ProcessorMetadata()).encode()))})));
    }

    @Test
    public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
        ProcessorStateManager processorStateManager = this.mockStateManager();
        this.recordCollector = (RecordCollector)Mockito.mock(RecordCollectorImpl.class);
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("at_least_once", "100"), true, processorStateManager);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        ((ProcessorStateManager)Mockito.verify((Object)processorStateManager)).checkpoint();
    }

    @Test
    public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
        ProcessorStateManager processorStateManager = this.mockStateManager();
        this.recordCollector = (RecordCollector)Mockito.mock(RecordCollectorImpl.class);
        this.task = this.createStatefulTask(StreamTaskTest.createConfig("exactly_once_v2", "100"), true, processorStateManager);
        this.task.initializeIfNeeded();
        this.task.completeRestoration(noOpResetter -> {});
        ((ProcessorStateManager)Mockito.verify((Object)processorStateManager, (VerificationMode)Mockito.never())).checkpoint();
        ((ProcessorStateManager)Mockito.verify((Object)processorStateManager, (VerificationMode)Mockito.never())).changelogOffsets();
        ((RecordCollector)Mockito.verify((Object)this.recordCollector, (VerificationMode)Mockito.never())).offsets();
    }

    private ProcessorStateManager mockStateManager() {
        ProcessorStateManager manager = (ProcessorStateManager)Mockito.mock(ProcessorStateManager.class);
        ((ProcessorStateManager)Mockito.doReturn((Object)Task.TaskType.ACTIVE).when((Object)manager)).taskType();
        ((ProcessorStateManager)Mockito.doReturn((Object)this.taskId).when((Object)manager)).taskId();
        return manager;
    }

    private List<MetricName> getTaskMetrics() {
        return this.metrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
    }

    private StreamTask createOptimizedStatefulTask(StreamsConfig config, Consumer<byte[], byte[]> consumer) {
        MockKeyValueStore stateStore = new MockKeyValueStore("store", true);
        ProcessorTopology topology = ProcessorTopologyFactories.with(Collections.singletonList(this.source1), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1)}), Collections.singletonList(stateStore), Collections.singletonMap("store", "topic1"));
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private StreamTask createDisconnectedTask(StreamsConfig config) {
        MockKeyValueStore stateStore = new MockKeyValueStore("store", false);
        ProcessorTopology topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}), Collections.singletonList(stateStore), Collections.emptyMap());
        MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST){

            public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) {
                throw new TimeoutException("KABOOM!");
            }
        };
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, (Consumer)consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private StreamTask createFaultyStatefulTask(StreamsConfig config) {
        ProcessorTopology topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.source1, this.source3}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source3)}), Collections.singletonList(this.stateStore), Collections.emptyMap());
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private StreamTask createStatefulTask(StreamsConfig config, boolean logged) {
        return this.createStatefulTask(config, logged, this.stateManager);
    }

    private StreamTask createStatefulTask(StreamsConfig config, boolean logged, ProcessorStateManager stateManager) {
        MockKeyValueStore stateStore = new MockKeyValueStore("store", logged);
        ProcessorTopology topology = ProcessorTopologyFactories.with(Arrays.asList(new ProcessorNode[]{this.source1, this.source2}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}), Collections.singletonList(stateStore), logged ? Collections.singletonMap("store", "store-changelog") : Collections.emptyMap());
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private StreamTask createSingleSourceStateless(StreamsConfig config, String builtInMetricsVersion) {
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{this.source1, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1)}));
        this.source1.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), new StreamsMetricsImpl(this.metrics, "test", builtInMetricsVersion, (Time)this.time), this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private StreamTask createStatelessTask(StreamsConfig config) {
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{this.source1, this.source2, this.processorStreamTime, this.processorSystemTime}), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.source1), Utils.mkEntry((Object)"topic2", this.source2)}));
        this.source1.addChild(this.processorStreamTime);
        this.source2.addChild(this.processorStreamTime);
        this.source1.addChild(this.processorSystemTime);
        this.source2.addChild(this.processorSystemTime);
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, this.partitions, topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)this.time), this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private StreamTask createStatelessTaskWithForwardingTopology(SourceNode<Integer, Integer> sourceNode) {
        ProcessorTopology topology = StreamTaskTest.withSources(Arrays.asList(new ProcessorNode[]{sourceNode, this.processorStreamTime}), Collections.singletonMap("topic1", sourceNode));
        sourceNode.addChild(this.processorStreamTime);
        StreamsConfig config = StreamTaskTest.createConfig();
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        return new StreamTask(this.taskId, Collections.singleton(this.partition1), topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), new StreamsMetricsImpl(this.metrics, "test", "latest", (Time)this.time), this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private void createTimeoutTask(String eosConfig) {
        ProcessorTopology topology = StreamTaskTest.withSources(Collections.singletonList(this.timeoutSource), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"topic1", this.timeoutSource)}));
        StreamsConfig config = StreamTaskTest.createConfig(eosConfig, "0");
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, config, this.stateManager, this.streamsMetrics, null);
        this.task = new StreamTask(this.taskId, Utils.mkSet((Object[])new TopicPartition[]{this.partition1}), topology, this.consumer, new TopologyConfig(null, config, new Properties()).getTaskConfig(), this.streamsMetrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, (InternalProcessorContext)context, this.logContext, false);
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(TopicPartition topicPartition, long offset, int value) {
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, offset, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.intSerializer.serialize(null, (Object)value), (Headers)new RecordHeaders(), Optional.empty());
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(TopicPartition topicPartition, long offset) {
        return new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), offset, offset, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty());
    }

    private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(Integer key, long offset) {
        return new ConsumerRecord("topic1", 1, offset, offset, TimestampType.CREATE_TIME, 0, 0, (Object)new IntegerSerializer().serialize("topic1", key), (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty());
    }

    private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(long offset) {
        return new ConsumerRecord("topic1", 1, offset, offset, TimestampType.CREATE_TIME, -1, -1, (Object)new byte[0], (Object)"I am not an integer.".getBytes(), (Headers)new RecordHeaders(), Optional.empty());
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        Sensor sensor = this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]);
        sensor.add(metricName, (MeasurableStat)new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double expected, StreamsMetricsImpl streamsMetrics, MetricName metricName) {
        KafkaMetric metric = (KafkaMetric)streamsMetrics.metrics().get(metricName);
        double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
        MatcherAssert.assertThat((Object)totalCloses, (Matcher)CoreMatchers.equalTo((Object)expected));
    }

    private /* synthetic */ void lambda$shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic$73(ProcessorTopology topology, StreamsMetricsImpl metrics, InternalProcessorContext context) throws Throwable {
        new StreamTask(this.taskId, this.partitions, topology, this.consumer, new TopologyConfig(null, StreamTaskTest.createConfig("100"), new Properties()).getTaskConfig(), metrics, this.stateDirectory, this.cache, (Time)this.time, this.stateManager, this.recordCollector, context, this.logContext, false);
    }
}

