package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.DeletedRecords;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StateUpdater;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;

@MockitoSettings(strictness = Strictness.STRICT_STUBS)
@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManagerTest.class */
public class TaskManagerTest {

    @Mock
    private InternalTopologyBuilder topologyBuilder;

    @org.easymock.Mock(type = MockType.DEFAULT)
    private StateDirectory stateDirectory;

    @Mock
    private ChangelogReader changeLogReader;

    @org.easymock.Mock(type = MockType.STRICT)
    private Consumer<byte[], byte[]> consumer;

    @Mock
    private ActiveTaskCreator activeTaskCreator;

    @Mock
    private StandbyTaskCreator standbyTaskCreator;

    @Mock
    private Admin adminClient;
    private TaskManager taskManager;
    private TopologyMetadata topologyMetadata;
    private final String topic1 = AssignmentTestUtils.TP_1_NAME;
    private final String topic2 = AssignmentTestUtils.TP_2_NAME;
    private final TaskId taskId00 = new TaskId(0, 0);
    private final TopicPartition t1p0 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 0);
    private final TopicPartition t1p0changelog = new TopicPartition("changelog", 0);
    private final Set<TopicPartition> taskId00Partitions = Utils.mkSet(new TopicPartition[]{this.t1p0});
    private final Set<TopicPartition> taskId00ChangelogPartitions = Utils.mkSet(new TopicPartition[]{this.t1p0changelog});
    private final Map<TaskId, Set<TopicPartition>> taskId00Assignment = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
    private final TaskId taskId01 = new TaskId(0, 1);
    private final TopicPartition t1p1 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 1);
    private final TopicPartition t2p2 = new TopicPartition(AssignmentTestUtils.TP_2_NAME, 1);
    private final TopicPartition t1p1changelog = new TopicPartition("changelog", 1);
    private final Set<TopicPartition> taskId01Partitions = Utils.mkSet(new TopicPartition[]{this.t1p1});
    private final Set<TopicPartition> taskId01ChangelogPartitions = Utils.mkSet(new TopicPartition[]{this.t1p1changelog});
    private final Map<TaskId, Set<TopicPartition>> taskId01Assignment = Collections.singletonMap(this.taskId01, this.taskId01Partitions);
    private final TaskId taskId02 = new TaskId(0, 2);
    private final TopicPartition t1p2 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 2);
    private final TopicPartition t1p2changelog = new TopicPartition("changelog", 2);
    private final Set<TopicPartition> taskId02Partitions = Utils.mkSet(new TopicPartition[]{this.t1p2});
    private final Set<TopicPartition> taskId02ChangelogPartitions = Utils.mkSet(new TopicPartition[]{this.t1p2changelog});
    private final TaskId taskId03 = new TaskId(0, 3);
    private final TopicPartition t1p3 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 3);
    private final TopicPartition t1p3changelog = new TopicPartition("changelog", 3);
    private final Set<TopicPartition> taskId03Partitions = Utils.mkSet(new TopicPartition[]{this.t1p3});
    private final Set<TopicPartition> taskId03ChangelogPartitions = Utils.mkSet(new TopicPartition[]{this.t1p3changelog});
    private final TaskId taskId04 = new TaskId(0, 4);
    private final TopicPartition t1p4 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 4);
    private final TopicPartition t1p4changelog = new TopicPartition("changelog", 4);
    private final Set<TopicPartition> taskId04Partitions = Utils.mkSet(new TopicPartition[]{this.t1p4});
    private final Set<TopicPartition> taskId04ChangelogPartitions = Utils.mkSet(new TopicPartition[]{this.t1p4changelog});
    private final TaskId taskId05 = new TaskId(0, 5);
    private final TopicPartition t1p5 = new TopicPartition(AssignmentTestUtils.TP_1_NAME, 5);
    private final Set<TopicPartition> taskId05Partitions = Utils.mkSet(new TopicPartition[]{this.t1p5});
    private final TaskId taskId10 = new TaskId(1, 0);
    private final TopicPartition t2p0 = new TopicPartition(AssignmentTestUtils.TP_2_NAME, 0);
    private final Set<TopicPartition> taskId10Partitions = Utils.mkSet(new TopicPartition[]{this.t2p0});
    final java.util.function.Consumer<Set<TopicPartition>> noOpResetter = set -> {
    };
    final StateUpdater stateUpdater = (StateUpdater) Mockito.mock(StateUpdater.class);
    private final Time time = new MockTime();

    @Rule
    public final TemporaryFolder testFolder = new TemporaryFolder();

    @Rule
    public final MockitoRule rule = MockitoJUnit.rule().strictness(Strictness.STRICT_STUBS);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManagerTest$StateMachineTask.class */
    public static class StateMachineTask extends AbstractTask implements Task {
        private final boolean active;
        private boolean commitNeeded;
        private boolean commitRequested;
        private boolean commitPrepared;
        private boolean commitCompleted;
        private Map<TopicPartition, OffsetAndMetadata> committableOffsets;
        private Map<TopicPartition, Long> purgeableOffsets;
        private Map<TopicPartition, Long> changelogOffsets;
        private Set<TopicPartition> partitionsForOffsetReset;
        private Long timeout;
        private final Map<TopicPartition, LinkedList<ConsumerRecord<byte[], byte[]>>> queue;

        StateMachineTask(TaskId taskId, Set<TopicPartition> set, boolean z) {
            this(taskId, set, z, null);
        }

        StateMachineTask(TaskId taskId, Set<TopicPartition> set, boolean z, ProcessorStateManager processorStateManager) {
            super(taskId, (ProcessorTopology) null, (StateDirectory) null, processorStateManager, set, new TopologyConfig(new DummyStreamsConfig()).getTaskConfig(), "test-task", StateMachineTask.class);
            this.commitNeeded = false;
            this.commitRequested = false;
            this.commitPrepared = false;
            this.commitCompleted = false;
            this.committableOffsets = Collections.emptyMap();
            this.changelogOffsets = Collections.emptyMap();
            this.partitionsForOffsetReset = Collections.emptySet();
            this.timeout = null;
            this.queue = new HashMap();
            this.active = z;
        }

        public void initializeIfNeeded() {
            if (state() == Task.State.CREATED) {
                transitionTo(Task.State.RESTORING);
                if (this.active) {
                    return;
                }
                transitionTo(Task.State.RUNNING);
            }
        }

        public void addPartitionsForOffsetReset(Set<TopicPartition> set) {
            this.partitionsForOffsetReset = set;
        }

        public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> consumer) {
            if (state() == Task.State.RUNNING) {
                return;
            }
            transitionTo(Task.State.RUNNING);
        }

        public void setCommitNeeded() {
            this.commitNeeded = true;
        }

        public boolean commitNeeded() {
            return this.commitNeeded;
        }

        public void setCommitRequested() {
            this.commitRequested = true;
        }

        public boolean commitRequested() {
            return this.commitRequested;
        }

        public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
            this.commitPrepared = true;
            return this.commitNeeded ? this.committableOffsets : Collections.emptyMap();
        }

        public void postCommit(boolean z) {
            this.commitNeeded = false;
            this.commitCompleted = true;
        }

        public void suspend() {
            if (state() == Task.State.CLOSED) {
                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + this.id);
            }
            if (state() == Task.State.SUSPENDED) {
                return;
            }
            transitionTo(Task.State.SUSPENDED);
        }

        public void resume() {
            if (state() == Task.State.SUSPENDED) {
                transitionTo(Task.State.RUNNING);
            }
        }

        public void revive() {
            this.commitNeeded = false;
            this.commitRequested = false;
            super.revive();
        }

        public void maybeInitTaskTimeoutOrThrow(long j, Exception exc) {
            this.timeout = Long.valueOf(j);
        }

        public void clearTaskTimeout() {
            this.timeout = null;
        }

        public void recordRestoration(Time time, long j, boolean z) {
        }

        public void closeClean() {
            transitionTo(Task.State.CLOSED);
        }

        public void closeDirty() {
            transitionTo(Task.State.CLOSED);
        }

        public void prepareRecycle() {
            transitionTo(Task.State.CLOSED);
        }

        public void updateInputPartitions(Set<TopicPartition> set, Map<String, List<String>> map) {
            this.inputPartitions = set;
        }

        void setCommittableOffsetsAndMetadata(Map<TopicPartition, OffsetAndMetadata> map) {
            if (!this.active) {
                throw new IllegalStateException("Cannot set CommittableOffsetsAndMetadate for StandbyTasks");
            }
            this.committableOffsets = map;
        }

        public StateStore getStore(String str) {
            return null;
        }

        public Set<TopicPartition> changelogPartitions() {
            return this.changelogOffsets.keySet();
        }

        public boolean isActive() {
            return this.active;
        }

        void setPurgeableOffsets(Map<TopicPartition, Long> map) {
            this.purgeableOffsets = map;
        }

        public Map<TopicPartition, Long> purgeableOffsets() {
            return this.purgeableOffsets;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setChangelogOffsets(Map<TopicPartition, Long> map) {
            this.changelogOffsets = map;
        }

        public Map<TopicPartition, Long> changelogOffsets() {
            return this.changelogOffsets;
        }

        public Map<TopicPartition, Long> committedOffsets() {
            return Collections.emptyMap();
        }

        public Map<TopicPartition, Long> highWaterMark() {
            return Collections.emptyMap();
        }

        public Optional<Long> timeCurrentIdlingStarted() {
            return Optional.empty();
        }

        public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
            if (!isActive()) {
                throw new IllegalStateException("Can't add records to an inactive task.");
            }
            LinkedList<ConsumerRecord<byte[], byte[]>> computeIfAbsent = this.queue.computeIfAbsent(topicPartition, topicPartition2 -> {
                return new LinkedList();
            });
            Iterator<ConsumerRecord<byte[], byte[]>> it = iterable.iterator();
            while (it.hasNext()) {
                computeIfAbsent.add(it.next());
            }
        }

        public boolean process(long j) {
            if (!isActive() || state() != Task.State.RUNNING) {
                throw new IllegalStateException("Can't process an inactive or non-running task.");
            }
            Iterator<LinkedList<ConsumerRecord<byte[], byte[]>>> it = this.queue.values().iterator();
            while (it.hasNext()) {
                if (it.next().poll() != null) {
                    return true;
                }
            }
            return false;
        }
    }

    @Before
    public void setUp() {
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, false);
    }

    private TaskManager setUpTaskManager(StreamsConfigUtils.ProcessingMode processingMode, boolean z) {
        return setUpTaskManager(processingMode, null, z);
    }

    private TaskManager setUpTaskManager(StreamsConfigUtils.ProcessingMode processingMode, TasksRegistry tasksRegistry, boolean z) {
        this.topologyMetadata = new TopologyMetadata(this.topologyBuilder, new DummyStreamsConfig(processingMode));
        TaskManager taskManager = new TaskManager(this.time, this.changeLogReader, UUID.randomUUID(), "taskManagerTest", this.activeTaskCreator, this.standbyTaskCreator, tasksRegistry != null ? tasksRegistry : new Tasks(new LogContext()), this.topologyMetadata, this.adminClient, this.stateDirectory, z ? this.stateUpdater : null);
        taskManager.setMainConsumer(this.consumer);
        return taskManager;
    }

    @Test
    public void shouldClassifyExistingTasksWithoutStateUpdater() {
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, false);
        Map<TaskId, Set<TopicPartition>> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId01, Utils.mkSet(new TopicPartition[]{this.t1p1}))});
        Map<TaskId, Set<TopicPartition>> mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId02, Utils.mkSet(new TopicPartition[]{this.t2p2}))});
        Map<TaskId, Set<TopicPartition>> mkMap3 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, Utils.mkSet(new TopicPartition[]{this.t1p3}))});
        HashMap hashMap = new HashMap(mkMap);
        hashMap.putAll(mkMap3);
        handleAssignment(mkMap, mkMap2, mkMap3);
        upTaskManager.handleAssignment(hashMap, mkMap2);
        Mockito.verifyNoInteractions(new Object[]{this.stateUpdater});
    }

    @Test
    public void shouldNotUpdateExistingStandbyTaskIfStandbyIsReassignedWithSameInputPartitionWithoutStateUpdater() {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(build, this.taskId03Partitions);
        ((StandbyTask) Mockito.verify(build, Mockito.never())).updateInputPartitions((Set) Mockito.eq(this.taskId03Partitions), (Map) Mockito.any());
    }

    @Test
    public void shouldUpdateExistingStandbyTaskIfStandbyIsReassignedWithDifferentInputPartitionWithoutStateUpdater() {
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(build, this.taskId04Partitions);
        ((StandbyTask) Mockito.verify(build)).updateInputPartitions((Set) Mockito.eq(this.taskId04Partitions), (Map) Mockito.any());
    }

    private void updateExistingStandbyTaskIfStandbyIsReassignedWithoutStateUpdater(Task task, Set<TopicPartition> set) {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, false).handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), set)}));
        ((Task) Mockito.verify(task)).resume();
    }

    @Test
    public void shouldPrepareActiveTaskInStateUpdaterToBeRecycled() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToRecycle(task.id(), task.inputPartitions());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldPrepareStandbyTaskInStateUpdaterToBeRecycled() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToRecycle(task.id(), task.inputPartitions());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRemoveUnusedActiveTaskFromStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToCloseClean(task.id());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRemoveUnusedStandbyTaskFromStateUpdater() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToCloseClean(task.id());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldUpdateInputPartitionOfActiveTaskInStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        Set<TopicPartition> set = this.taskId02Partitions;
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), set)}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToUpdateInputPartitions(task.id(), set);
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldKeepReAssignedActiveTaskInStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldRemoveReAssignedRevokedActiveTaskInStateUpdaterFromPendingTaskToSuspend() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).removePendingActiveTaskToSuspend(task.id());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldNeverUpdateInputPartitionsOfStandbyTaskInStateUpdater() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        Set<TopicPartition> set = this.taskId03Partitions;
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), set)}));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).addPendingTaskToUpdateInputPartitions(task.id(), set);
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldKeepReAssignedStandbyTaskInStateUpdater() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldAssignMultipleTasksInStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task2.id(), task2.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToCloseClean(task.id());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task2.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToRecycle(task2.id(), task2.inputPartitions());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldReturnStateUpdaterTasksInAllTasks() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        Mockito.when(tasksRegistry.allTasksPerId()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, build)}));
        Assert.assertEquals(upTaskManager.allTasks(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, build), Utils.mkEntry(this.taskId02, task)}));
    }

    @Test
    public void shouldNotReturnStateUpdaterTasksInOwnedTasks() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        Mockito.when(tasksRegistry.allTasksPerId()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, build)}));
        Assert.assertEquals(upTaskManager.allOwnedTasks(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, build)}));
    }

    @Test
    public void shouldCreateActiveTaskDuringAssignment() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Set mkSet = Utils.mkSet(new Task[]{task});
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())});
        Mockito.when(this.activeTaskCreator.createTasks(this.consumer, mkMap)).thenReturn(mkSet);
        upTaskManager.handleAssignment(mkMap, Collections.emptyMap());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTasksToInit(mkSet);
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldCreateStandbyTaskDuringAssignment() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Set mkSet = Utils.mkSet(new Task[]{task});
        Mockito.when(this.standbyTaskCreator.createTasks(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}))).thenReturn(mkSet);
        upTaskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTasksToInit(mkSet);
    }

    @Test
    public void shouldAssignActiveTaskInTasksRegistryToBeRecycledWithStateUpdaterEnabled() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.SUSPENDED).withInputPartitions(this.taskId03Partitions).build();
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(task, task.inputPartitions())).thenReturn(build);
        upTaskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(task.id());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StreamTask) Mockito.verify(task)).prepareCommit();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).replaceActiveWithStandby(build);
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldThrowDuringAssignmentIfStandbyTaskToRecycleIsFoundInTasksRegistryWithStateUpdaterEnabled() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Assert.assertEquals(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}), Collections.emptyMap());
        })).getMessage(), "Standby tasks should only be managed by the state updater, but standby task " + this.taskId03 + " is managed by the stream thread");
        Mockito.verifyNoInteractions(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldAssignActiveTaskInTasksRegistryToBeClosedCleanlyWithStateUpdaterEnabled() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(task.id());
        ((StreamTask) Mockito.verify(task)).prepareCommit();
        ((StreamTask) Mockito.verify(task)).closeClean();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).removeTask(task);
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldThrowDuringAssignmentIfStandbyTaskToCloseIsFoundInTasksRegistryWithStateUpdaterEnabled() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        Assert.assertEquals(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            upTaskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        })).getMessage(), "Standby tasks should only be managed by the state updater, but standby task " + this.taskId03 + " is managed by the stream thread");
        Mockito.verifyNoInteractions(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldAssignActiveTaskInTasksRegistryToUpdateInputPartitionsWithStateUpdaterEnabled() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        Set<TopicPartition> set = this.taskId02Partitions;
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        Mockito.when(Boolean.valueOf(tasksRegistry.updateActiveTaskInputPartitions(task, set))).thenReturn(true);
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), set)}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StreamTask) Mockito.verify(task)).updateInputPartitions((Set) Mockito.eq(set), (Map) ArgumentMatchers.any());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldResumeActiveRunningTaskInTasksRegistryWithStateUpdaterEnabled() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldResumeActiveSuspendedTaskInTasksRegistryAndAddToStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.SUSPENDED).withInputPartitions(this.taskId03Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), task.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Collections.emptyMap());
        ((StreamTask) Mockito.verify(task)).resume();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task);
        ((TasksRegistry) Mockito.verify(tasksRegistry)).removeTask(task);
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldThrowDuringAssignmentIfStandbyTaskToUpdateInputPartitionsIsFoundInTasksRegistryWithStateUpdaterEnabled() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        Set<TopicPartition> set = this.taskId03Partitions;
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        Assert.assertEquals(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
            upTaskManager.handleAssignment(Collections.emptyMap(), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(task.id(), set)}));
        })).getMessage(), "Standby tasks should only be managed by the state updater, but standby task " + this.taskId02 + " is managed by the stream thread");
        Mockito.verifyNoInteractions(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldAssignMultipleTasksInTasksRegistryWithStateUpdaterEnabled() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId03Partitions).build();
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        upTaskManager.handleAssignment(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build.inputPartitions())}), Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks(this.consumer, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(build.id(), build.inputPartitions())}));
        ((StreamTask) Mockito.verify(task)).closeClean();
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldAddTasksToStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.drainPendingTasksToInit()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask) Mockito.verify(task)).initializeIfNeeded();
        ((StandbyTask) Mockito.verify(task2)).initializeIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task);
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task2);
    }

    @Test
    public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.drainPendingTasksToInit()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        ((StreamTask) Mockito.doThrow(new Throwable[]{new LockException("Where are my keys??")}).when(task)).initializeIfNeeded();
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask) Mockito.verify(task)).initializeIfNeeded();
        ((StandbyTask) Mockito.verify(task2)).initializeIfNeeded();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTasksToInit(Collections.singleton(task));
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task2);
    }

    @Test
    public void shouldRecycleTasksRemovedFromStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00Partitions).withInputPartitions(this.taskId00Partitions).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01Partitions).withInputPartitions(this.taskId01Partitions).build();
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasRemovedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRemovedTasks()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(task.id())).thenReturn(this.taskId00Partitions);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(task2.id())).thenReturn(this.taskId01Partitions);
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.activeTaskCreator.createActiveTaskFromStandby(task2, this.taskId01Partitions, this.consumer)).thenReturn(build2);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(task, this.taskId00Partitions)).thenReturn(build);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded((TaskId) ArgumentMatchers.any());
        ((StreamTask) Mockito.verify(task)).suspend();
        ((StandbyTask) Mockito.verify(task2)).suspend();
        ((StandbyTask) Mockito.verify(build)).initializeIfNeeded();
        ((StreamTask) Mockito.verify(build2)).initializeIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build);
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build2);
    }

    @Test
    public void shouldCloseTasksRemovedFromStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasRemovedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRemovedTasks()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle((TaskId) ArgumentMatchers.any())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean(task.id()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean(task2.id()))).thenReturn(true);
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded((TaskId) ArgumentMatchers.any());
        ((StreamTask) Mockito.verify(task)).suspend();
        ((StreamTask) Mockito.verify(task)).closeClean();
        ((StandbyTask) Mockito.verify(task2)).suspend();
        ((StandbyTask) Mockito.verify(task2)).closeClean();
    }

    @Test
    public void shouldUpdateInputPartitionsOfTasksRemovedFromStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).withInputPartitions(this.taskId00Partitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).withInputPartitions(this.taskId01Partitions).inState(Task.State.RUNNING).build();
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasRemovedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRemovedTasks()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle((TaskId) ArgumentMatchers.any())).thenReturn((Object) null);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(task.id())).thenReturn(this.taskId02Partitions);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(task2.id())).thenReturn(this.taskId03Partitions);
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask) Mockito.verify(task)).updateInputPartitions((Set) Mockito.eq(this.taskId02Partitions), ArgumentMatchers.anyMap());
        ((StreamTask) Mockito.verify(task, Mockito.never())).closeDirty();
        ((StreamTask) Mockito.verify(task, Mockito.never())).closeClean();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task);
        ((StandbyTask) Mockito.verify(task2)).updateInputPartitions((Set) Mockito.eq(this.taskId03Partitions), ArgumentMatchers.anyMap());
        ((StandbyTask) Mockito.verify(task2, Mockito.never())).closeDirty();
        ((StandbyTask) Mockito.verify(task2, Mockito.never())).closeClean();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task2);
    }

    @Test
    public void shouldSuspendRevokedTaskRemovedFromStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(task.id())).thenReturn((Object) null);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(task.id())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingActiveTaskToSuspend(task.id()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasRemovedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRemovedTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        this.taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        EasyMock.verify(new Object[]{this.consumer});
        ((StreamTask) Mockito.verify(task)).suspend();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(task);
    }

    @Test
    public void shouldHandleMultipleRemovedTasksFromStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        StandbyTask build = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).build();
        Task task3 = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        Task task4 = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasRemovedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRemovedTasks()).thenReturn(Utils.mkSet(new Task[]{task, task2, task3, task4}));
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Mockito.when(this.activeTaskCreator.createActiveTaskFromStandby(task2, this.taskId01Partitions, this.consumer)).thenReturn(build2);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(task, this.taskId00Partitions)).thenReturn(build);
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet()).anyTimes();
        this.consumer.resume((Collection) EasyMock.anyObject());
        EasyMock.expectLastCall().anyTimes();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean(task3.id()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean((TaskId) ArgumentMatchers.argThat(taskId -> {
            return !taskId.equals(task3.id());
        })))).thenReturn(false);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(task.id())).thenReturn(this.taskId00Partitions);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(task2.id())).thenReturn(this.taskId01Partitions);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle((TaskId) ArgumentMatchers.argThat(taskId2 -> {
            return (taskId2.equals(task.id()) || taskId2.equals(task2.id())) ? false : true;
        }))).thenReturn((Object) null);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(task4.id())).thenReturn(this.taskId04Partitions);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        upTaskManager.setMainConsumer(this.consumer);
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.checkStateUpdater(this.time.milliseconds(), set -> {
        });
        EasyMock.verify(new Object[]{this.consumer});
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator, Mockito.times(2))).closeAndRemoveTaskProducerIfNeeded((TaskId) ArgumentMatchers.any());
        ((StandbyTask) Mockito.verify(build)).initializeIfNeeded();
        ((StreamTask) Mockito.verify(build2)).initializeIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build);
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build2);
        ((StreamTask) Mockito.verify(task3)).closeClean();
        ((StreamTask) Mockito.verify(task4)).updateInputPartitions((Set) Mockito.eq(this.taskId04Partitions), ArgumentMatchers.anyMap());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task4);
    }

    @Test
    public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreRestoring() {
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Assert.assertFalse(setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true).checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToInitButPendingTasksToRecycle() {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(Boolean.valueOf(tasksRegistry.hasPendingTasksToRecycle())).thenReturn(true);
        Assert.assertFalse(setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true).checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldReturnFalseFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTaskToRecycleButPendingTasksToInit() {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(Boolean.valueOf(tasksRegistry.hasPendingTasksToInit())).thenReturn(true);
        Assert.assertFalse(setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true).checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldReturnTrueFromCheckStateUpdaterIfActiveTasksAreNotRestoringAndNoPendingTasksToRecycleAndInit() {
        Assert.assertTrue(setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true).checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldAddActiveTaskWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager taskManager = setupForRevocationAndLost(Utils.mkSet(new Task[]{task}), tasksRegistry);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        taskManager.handleRevocation(task.inputPartitions());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingActiveTaskToSuspend(task.id());
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).remove(task.id());
    }

    public void shouldAddMultipleActiveTasksWithRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        Task task2 = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        setupForRevocationAndLost(Utils.mkSet(new Task[]{task, task2}), tasksRegistry).handleRevocation(Utils.union(HashSet::new, new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingActiveTaskToSuspend(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingActiveTaskToSuspend(task2.id());
    }

    @Test
    public void shouldNotAddActiveTaskWithoutRevokedInputPartitionsInStateUpdaterToPendingTasksToSuspend() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        setupForRevocationAndLost(Utils.mkSet(new Task[]{task}), tasksRegistry).handleRevocation(this.taskId01Partitions);
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).addPendingActiveTaskToSuspend(task.id());
    }

    @Test
    public void shouldNotRevokeStandbyTaskInStateUpdaterOnRevocation() {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        setupForRevocationAndLost(Utils.mkSet(new Task[]{task}), tasksRegistry).handleRevocation(this.taskId00Partitions);
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).remove(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).addPendingActiveTaskToSuspend(task.id());
    }

    @Test
    public void shouldRemoveAllActiveTasksFromStateUpdaterOnPartitionLost() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        Task task3 = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        setupForRevocationAndLost(Utils.mkSet(new Task[]{task, task2, task3}), tasksRegistry).handleLostAll();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task.id());
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).remove(task2.id());
        ((StateUpdater) Mockito.verify(this.stateUpdater)).remove(task3.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToCloseDirty(task.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).addPendingTaskToCloseDirty(task2.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).addPendingTaskToCloseClean(task2.id());
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTaskToCloseDirty(task3.id());
    }

    private TaskManager setupForRevocationAndLost(Set<Task> set, TasksRegistry tasksRegistry) {
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(set);
        return upTaskManager;
    }

    @Test
    public void shouldTransitRestoredTaskToRunning() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTransitionToRunningOfRestoredTask = setUpTransitionToRunningOfRestoredTask(build, tasksRegistry);
        this.consumer.resume(build.inputPartitions());
        EasyMock.replay(new Object[]{this.consumer});
        upTransitionToRunningOfRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask) Mockito.verify(build)).completeRestoration(this.noOpResetter);
        ((StreamTask) Mockito.verify(build)).clearTaskTimeout();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(build);
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldHandleTimeoutExceptionInTransitRestoredTaskToRunning() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTransitionToRunningOfRestoredTask = setUpTransitionToRunningOfRestoredTask(build, tasksRegistry);
        Throwable timeoutException = new TimeoutException();
        ((StreamTask) Mockito.doThrow(new Throwable[]{timeoutException}).when(build)).completeRestoration(this.noOpResetter);
        EasyMock.replay(new Object[]{this.consumer});
        upTransitionToRunningOfRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((StreamTask) Mockito.verify(build)).maybeInitTaskTimeoutOrThrow(ArgumentMatchers.anyLong(), (Exception) Mockito.eq(timeoutException));
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).addTask(build);
        ((StreamTask) Mockito.verify(build, Mockito.never())).clearTaskTimeout();
        EasyMock.verify(new Object[]{this.consumer});
    }

    private TaskManager setUpTransitionToRunningOfRestoredTask(StreamTask streamTask, TasksRegistry tasksRegistry) {
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(streamTask.id())).thenReturn((Object) null);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(streamTask.id())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{streamTask}));
        return setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
    }

    @Test
    public void shouldReturnCorrectBooleanWhenTryingToCompleteRestorationWithStateUpdater() {
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(false);
        Assert.assertTrue(upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Assert.assertFalse(upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter));
    }

    @Test
    public void shouldRecycleRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId00Partitions).build();
        TaskManager upRecycleRestoredTask = setUpRecycleRestoredTask(build);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(build, build.inputPartitions())).thenReturn(build2);
        upRecycleRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build.id());
        ((StreamTask) Mockito.verify(build)).suspend();
        ((StandbyTask) Mockito.verify(build2)).initializeIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build2);
    }

    @Test
    public void shouldHandleExceptionThrownDuringConversionInRecycleRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TaskManager upRecycleRestoredTask = setUpRecycleRestoredTask(build);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(build, build.inputPartitions())).thenThrow(new Throwable[]{new RuntimeException()});
        Assert.assertThrows(StreamsException.class, () -> {
            upRecycleRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).add((Task) ArgumentMatchers.any());
        ((StreamTask) Mockito.verify(build)).closeDirty();
    }

    @Test
    public void shouldHandleExceptionThrownDuringTaskInitInRecycleRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CLOSED).withInputPartitions(this.taskId00Partitions).build();
        StandbyTask build2 = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId00Partitions).build();
        TaskManager upRecycleRestoredTask = setUpRecycleRestoredTask(build);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(build, build.inputPartitions())).thenReturn(build2);
        ((StandbyTask) Mockito.doThrow(StreamsException.class).when(build2)).initializeIfNeeded();
        Assert.assertThrows(StreamsException.class, () -> {
            upRecycleRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        ((StateUpdater) Mockito.verify(this.stateUpdater, Mockito.never())).add((Task) ArgumentMatchers.any());
        ((StandbyTask) Mockito.verify(build2)).closeDirty();
    }

    private TaskManager setUpRecycleRestoredTask(StreamTask streamTask) {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(streamTask.id())).thenReturn(this.taskId00Partitions);
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{streamTask}));
        return setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
    }

    @Test
    public void shouldCloseCleanRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        setUpCloseCleanRestoredTask(build, tasksRegistry).checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build.id());
        ((StreamTask) Mockito.verify(build)).suspend();
        ((StreamTask) Mockito.verify(build)).closeClean();
        ((StreamTask) Mockito.verify(build, Mockito.never())).closeDirty();
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).removeTask(build);
    }

    @Test
    public void shouldHandleExceptionThrownDuringCloseInCloseCleanRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upCloseCleanRestoredTask = setUpCloseCleanRestoredTask(build, tasksRegistry);
        ((StreamTask) Mockito.doThrow(RuntimeException.class).when(build)).closeClean();
        Assert.assertThrows(RuntimeException.class, () -> {
            upCloseCleanRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build.id());
        ((StreamTask) Mockito.verify(build)).closeDirty();
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).removeTask(build);
    }

    @Test
    public void shouldHandleExceptionThrownDuringClosingTaskProducerInCloseCleanRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CLOSED).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upCloseCleanRestoredTask = setUpCloseCleanRestoredTask(build, tasksRegistry);
        ((ActiveTaskCreator) Mockito.doThrow(new Throwable[]{new RuntimeException("Something happened")}).when(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build.id());
        Assert.assertThrows(RuntimeException.class, () -> {
            upCloseCleanRestoredTask.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        ((StreamTask) Mockito.verify(build, Mockito.never())).closeDirty();
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).removeTask(build);
    }

    private TaskManager setUpCloseCleanRestoredTask(StreamTask streamTask, TasksRegistry tasksRegistry) {
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(streamTask.id())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean(streamTask.id()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{streamTask}));
        return setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
    }

    @Test
    public void shouldCloseDirtyRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(build.id())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseDirty(build.id()))).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{build}));
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true).checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build.id());
        ((StreamTask) Mockito.verify(build)).prepareCommit();
        ((StreamTask) Mockito.verify(build)).suspend();
        ((StreamTask) Mockito.verify(build)).closeDirty();
        ((StreamTask) Mockito.verify(build, Mockito.never())).closeClean();
        ((TasksRegistry) Mockito.verify(tasksRegistry, Mockito.never())).removeTask(build);
    }

    @Test
    public void shouldUpdateInputPartitionsOfRestoredTask() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(build.id())).thenReturn((Object) null);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(build.id())).thenReturn(this.taskId01Partitions);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{build}));
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        this.consumer.resume(build.inputPartitions());
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        EasyMock.verify(new Object[]{this.consumer});
        ((StreamTask) Mockito.verify(build)).updateInputPartitions((Set) Mockito.eq(this.taskId01Partitions), ArgumentMatchers.anyMap());
        ((StreamTask) Mockito.verify(build)).completeRestoration(this.noOpResetter);
        ((StreamTask) Mockito.verify(build)).clearTaskTimeout();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(build);
    }

    @Test
    public void shouldSuspendRestoredTaskIfRevoked() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(build.id())).thenReturn((Object) null);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(build.id())).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingActiveTaskToSuspend(build.id()))).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{build}));
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        EasyMock.verify(new Object[]{this.consumer});
        ((StreamTask) Mockito.verify(build)).suspend();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(build);
    }

    @Test
    public void shouldHandleMultipleRestoredTasks() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        StandbyTask build3 = StreamsTestUtils.TaskBuilder.standbyTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId01Partitions).build();
        StreamTask build4 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId02Partitions).build();
        StreamTask build5 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        StreamTask build6 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId04, this.taskId04ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId04Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive(build2, build2.inputPartitions())).thenReturn(build3);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle(build2.id())).thenReturn(this.taskId01Partitions);
        Mockito.when(tasksRegistry.removePendingTaskToRecycle((TaskId) ArgumentMatchers.argThat(taskId -> {
            return !taskId.equals(build2.id());
        }))).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean(build4.id()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseClean((TaskId) ArgumentMatchers.argThat(taskId2 -> {
            return !taskId2.equals(build4.id());
        })))).thenReturn(false);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseDirty(build5.id()))).thenReturn(true);
        Mockito.when(Boolean.valueOf(tasksRegistry.removePendingTaskToCloseDirty((TaskId) ArgumentMatchers.argThat(taskId3 -> {
            return !taskId3.equals(build5.id());
        })))).thenReturn(false);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions(build6.id())).thenReturn(this.taskId05Partitions);
        Mockito.when(tasksRegistry.removePendingTaskToUpdateInputPartitions((TaskId) ArgumentMatchers.argThat(taskId4 -> {
            return !taskId4.equals(build6.id());
        }))).thenReturn((Object) null);
        Mockito.when(Boolean.valueOf(this.stateUpdater.restoresActiveTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks((Duration) ArgumentMatchers.any(Duration.class))).thenReturn(Utils.mkSet(new StreamTask[]{build, build2, build4, build5, build6}));
        upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(build);
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build3);
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(build3);
        ((StreamTask) Mockito.verify(build4)).closeClean();
        ((StreamTask) Mockito.verify(build5)).closeDirty();
        ((StreamTask) Mockito.verify(build6)).updateInputPartitions((Set) Mockito.eq(this.taskId05Partitions), ArgumentMatchers.anyMap());
    }

    @Test
    public void shouldRethrowStreamsExceptionFromStateUpdater() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamsException streamsException = new StreamsException("boom!");
        StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks(Collections.singleton(build), streamsException);
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasExceptionsAndFailedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        Assert.assertEquals(streamsException, assertThrows);
        Assert.assertEquals(build.id(), assertThrows.taskId().get());
    }

    @Test
    public void shouldRethrowRuntimeExceptionFromStateUpdater() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        RuntimeException runtimeException = new RuntimeException("boom!");
        StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks(Collections.singleton(build), runtimeException);
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasExceptionsAndFailedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Collections.singletonList(exceptionAndTasks));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        Assert.assertEquals(runtimeException, assertThrows.getCause());
        Assert.assertEquals(build.id(), assertThrows.taskId().get());
        Assert.assertEquals("Encounter unexpected fatal error for task 0_0", assertThrows.getMessage());
    }

    @Test
    public void shouldRethrowTaskCorruptedExceptionFromStateUpdater() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId00Partitions).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId01Partitions).build();
        StateUpdater.ExceptionAndTasks exceptionAndTasks = new StateUpdater.ExceptionAndTasks(Collections.singleton(build), new TaskCorruptedException(Collections.singleton(this.taskId00)));
        StateUpdater.ExceptionAndTasks exceptionAndTasks2 = new StateUpdater.ExceptionAndTasks(Collections.singleton(build2), new TaskCorruptedException(Collections.singleton(this.taskId01)));
        Mockito.when(Boolean.valueOf(this.stateUpdater.hasExceptionsAndFailedTasks())).thenReturn(true);
        Mockito.when(this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(exceptionAndTasks, exceptionAndTasks2));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        TaskCorruptedException assertThrows = Assert.assertThrows(TaskCorruptedException.class, () -> {
            upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01}), assertThrows.corruptedTasks());
        Assert.assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", assertThrows.getMessage());
    }

    @Test
    public void shouldRethrowTaskCorruptedExceptionFromInitialization() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId00Partitions).build();
        Task task2 = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId01Partitions).build();
        Task task3 = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.CREATED).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, tasksRegistry, true);
        Mockito.when(tasksRegistry.drainPendingTasksToInit()).thenReturn(Utils.mkSet(new Task[]{task, task2, task3}));
        ((StreamTask) Mockito.doThrow(new Throwable[]{new TaskCorruptedException(Collections.singleton(((StreamTask) task).id))}).when(task)).initializeIfNeeded();
        ((StreamTask) Mockito.doThrow(new Throwable[]{new TaskCorruptedException(Collections.singleton(((StreamTask) task2).id))}).when(task2)).initializeIfNeeded();
        TaskCorruptedException assertThrows = Assert.assertThrows(TaskCorruptedException.class, () -> {
            upTaskManager.checkStateUpdater(this.time.milliseconds(), this.noOpResetter);
        });
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(task);
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addTask(task2);
        ((StateUpdater) Mockito.verify(this.stateUpdater)).add(task3);
        Assert.assertEquals(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01}), assertThrows.corruptedTasks());
        Assert.assertEquals("Tasks [0_1, 0_0] are corrupted and hence need to be re-initialized", assertThrows.getMessage());
    }

    @Test
    public void shouldAddSubscribedTopicsFromAssignmentToTopologyMetadata() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId01, Utils.mkSet(new TopicPartition[]{this.t1p1})), Utils.mkEntry(this.taskId02, Utils.mkSet(new TopicPartition[]{this.t1p2, this.t2p2}))});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, Utils.mkSet(new TopicPartition[]{this.t1p3})), Utils.mkEntry(this.taskId04, Utils.mkSet(new TopicPartition[]{this.t1p4}))});
        Mockito.when(this.standbyTaskCreator.createTasks(mkMap2)).thenReturn(Collections.emptySet());
        this.taskManager.handleAssignment(mkMap, mkMap2);
        ((InternalTopologyBuilder) Mockito.verify(this.topologyBuilder)).addSubscribedTopicsFromAssignment((Set) Mockito.eq(Utils.mkSet(new TopicPartition[]{this.t1p1, this.t1p2, this.t2p2})), Mockito.anyString());
        ((InternalTopologyBuilder) Mockito.verify(this.topologyBuilder, Mockito.never())).addSubscribedTopicsFromAssignment((Set) Mockito.eq(Utils.mkSet(new TopicPartition[]{this.t1p3, this.t1p4})), Mockito.anyString());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap));
    }

    @Test
    public void shouldNotLockAnythingIfStateDirIsEmpty() {
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList()).once();
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        EasyMock.verify(new Object[]{this.stateDirectory});
        Assert.assertTrue(this.taskManager.lockedTaskDirectories().isEmpty());
    }

    @Test
    public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
        expectLockObtainedFor(this.taskId01);
        expectLockFailedFor(this.taskId10);
        makeTaskFolders(this.taskId01.toString(), this.taskId10.toString(), "dummy");
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        EasyMock.verify(new Object[]{this.stateDirectory});
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Collections.singleton(this.taskId01)));
    }

    @Test
    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1});
        EasyMock.expect(this.consumer.assignment()).andReturn(mkSet);
        this.consumer.pause(mkSet);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleRebalanceComplete();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId00Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        EasyMock.expect(this.consumer.assignment()).andReturn(Utils.mkSet(new TopicPartition[]{this.t1p0, this.t1p1}));
        this.consumer.pause(Utils.mkSet(new TopicPartition[]{this.t1p1}));
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.handleRebalanceComplete();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception {
        expectLockObtainedFor(this.taskId00, this.taskId01, this.taskId02);
        expectUnlockFor(this.taskId02);
        makeTaskFolders(this.taskId00.toString(), this.taskId01.toString(), this.taskId02.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01, this.taskId02})));
        handleAssignment(this.taskId00Assignment, this.taskId01Assignment, Collections.emptyMap());
        EasyMock.reset(new Object[]{this.consumer});
        expectConsumerAssignmentPaused(this.consumer);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldReportLatestOffsetAsOffsetSumForRunningTask() throws Exception {
        computeOffsetSumAndVerify(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), -2L), Utils.mkEntry(new TopicPartition("changelog", 1), -2L)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, -2L)}));
    }

    @Test
    public void shouldComputeOffsetSumForNonRunningActiveTask() throws Exception {
        computeOffsetSumAndVerify(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)}));
    }

    @Test
    public void shouldComputeOffsetSumForRestoringActiveTaskWithStateUpdater() throws Exception {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RESTORING).build();
        Mockito.when(task.changelogOffsets()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p0changelog, 42L)}));
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p0changelog, 24L)}));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        EasyMock.replay(new Object[]{this.stateDirectory});
        upTaskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        MatcherAssert.assertThat(upTaskManager.getTaskOffsetSums(), Matchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 42L)})));
    }

    @Test
    public void shouldComputeOffsetSumForRestoringStandbyTaskWithStateUpdater() throws Exception {
        Task task = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).build();
        Mockito.when(task.changelogOffsets()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p0changelog, 42L)}));
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p0changelog, 24L)}));
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, (TasksRegistry) Mockito.mock(TasksRegistry.class), true);
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        EasyMock.replay(new Object[]{this.stateDirectory});
        upTaskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        MatcherAssert.assertThat(upTaskManager.getTaskOffsetSums(), Matchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 42L)})));
    }

    @Test
    public void shouldComputeOffsetSumForRunningStatefulTaskAndRestoringTaskWithStateUpdater() {
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId00, this.taskId00ChangelogPartitions).inState(Task.State.RUNNING).build();
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).build();
        Mockito.when(build.changelogOffsets()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p0changelog, 42L)}));
        Mockito.when(task.changelogOffsets()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p1changelog, 24L)}));
        Mockito.when(task2.changelogOffsets()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.t1p2changelog, 84L)}));
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.allTasksPerId()).thenReturn(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, build)}));
        Mockito.when(this.stateUpdater.getTasks()).thenReturn(Utils.mkSet(new Task[]{task2, task}));
        MatcherAssert.assertThat(upTaskManager.getTaskOffsetSums(), Matchers.is(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 42L), Utils.mkEntry(this.taskId01, 24L), Utils.mkEntry(this.taskId02, 84L)})));
    }

    @Test
    public void shouldSkipUnknownOffsetsWhenComputingOffsetSum() throws Exception {
        computeOffsetSumAndVerify(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), -4L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)}), Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 10L)}));
    }

    private void computeOffsetSumAndVerify(Map<TopicPartition, Long> map, Map<TaskId, Long> map2) throws Exception {
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        handleAssignment(Collections.emptyMap(), Collections.emptyMap(), this.taskId00Assignment).get(this.taskId00).setChangelogOffsets(map);
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(map2));
    }

    @Test
    public void shouldComputeOffsetSumForStandbyTask() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        handleAssignment(Collections.emptyMap(), this.taskId00Assignment, Collections.emptyMap()).get(this.taskId00).setChangelogOffsets(mkMap);
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, mkMap);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldComputeOffsetSumFromCheckpointFileForUninitializedTask() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, mkMap);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singleton(stateMachineTask));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldComputeOffsetSumFromCheckpointFileForClosedTask() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 0), 5L), Utils.mkEntry(new TopicPartition("changelog", 1), 10L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, 15L)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, mkMap);
        EasyMock.replay(new Object[]{this.stateDirectory});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singleton(stateMachineTask));
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        stateMachineTask.suspend();
        stateMachineTask.closeClean();
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldNotReportOffsetSumsForTaskWeCantLock() throws Exception {
        expectLockFailedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        Assert.assertTrue(this.taskManager.lockedTaskDirectories().isEmpty());
        Assert.assertTrue(this.taskManager.getTaskOffsetSums().isEmpty());
    }

    @Test
    public void shouldNotReportOffsetSumsAndReleaseLockForUnassignedTaskWithoutCheckpoint() throws Exception {
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        EasyMock.expect(this.stateDirectory.checkpointFileFor(this.taskId00)).andReturn(getCheckpointFile(this.taskId00));
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        Assert.assertTrue(this.taskManager.getTaskOffsetSums().isEmpty());
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws Exception {
        Map<TopicPartition, Long> mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(new TopicPartition("changelog", 1), 4611686018427387903L), Utils.mkEntry(new TopicPartition("changelog", 2), 4611686018427387903L), Utils.mkEntry(new TopicPartition("changelog", 3), 4611686018427387903L)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, Long.MAX_VALUE)});
        expectLockObtainedFor(this.taskId00);
        makeTaskFolders(this.taskId00.toString());
        writeCheckpointFile(this.taskId00, mkMap);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TOPIC_PREFIX));
        MatcherAssert.assertThat(this.taskManager.getTaskOffsetSums(), Matchers.is(mkMap2));
    }

    @Test
    public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.expectLastCall();
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldCloseDirtyActiveUnassignedTasksWhenErrorCleanClosingTask() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.1
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeClean() {
                throw new RuntimeException("KABOOM!");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Encounter unexpected fatal error for task 0_0"));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("KABOOM!"));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(stateMachineTask2));
        makeTaskFolders(this.taskId00.toString(), this.taskId01.toString());
        expectLockObtainedFor(this.taskId00, this.taskId01);
        makeTaskFolders(new String[0]);
        expectLockObtainedFor(new TaskId[0]);
        EasyMock.replay(new Object[]{this.stateDirectory});
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleLostAll();
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.is(Collections.singletonMap(this.taskId01, stateMachineTask2)));
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(this.taskManager.lockedTaskDirectories(), Matchers.is(Collections.emptySet()));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldReInitializeThreadProducerOnHandleLostAllIfEosV2Enabled() {
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false).handleLostAll();
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).reInitializeThreadProducer();
    }

    @Test
    public void shouldThrowWhenHandlingClosingTasksOnProducerCloseError() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        ((ActiveTaskCreator) Mockito.doThrow(new Throwable[]{new RuntimeException("KABOOM!")}).when(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("Encounter unexpected fatal error for task 0_0"));
        MatcherAssert.assertThat(runtimeException.getCause(), Matchers.instanceOf(RuntimeException.class));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("KABOOM!"));
    }

    @Test
    public void shouldReAddRevivedTasksToStateUpdater() {
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId03, this.taskId03ChangelogPartitions).inState(Task.State.RESTORING).withInputPartitions(this.taskId03Partitions).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).withInputPartitions(this.taskId02Partitions).build();
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true);
        Mockito.when(tasksRegistry.task(this.taskId03)).thenReturn(task);
        Mockito.when(tasksRegistry.task(this.taskId02)).thenReturn(task2);
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.handleCorruption(Utils.mkSet(new TaskId[]{task.id(), task2.id()}));
        InOrder inOrder = Mockito.inOrder(new Object[]{task});
        ((StreamTask) inOrder.verify(task)).closeDirty();
        ((StreamTask) inOrder.verify(task)).revive();
        InOrder inOrder2 = Mockito.inOrder(new Object[]{task2});
        ((StandbyTask) inOrder2.verify(task2)).closeDirty();
        ((StandbyTask) inOrder2.verify(task2)).revive();
        ((TasksRegistry) Mockito.verify(tasksRegistry)).removeTask(task);
        ((TasksRegistry) Mockito.verify(tasksRegistry)).removeTask(task2);
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTasksToInit(Utils.mkSet(new Task[]{task}));
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addPendingTasksToInit(Utils.mkSet(new Task[]{task2}));
    }

    @Test
    public void shouldReviveCorruptTasks() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.2
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void postCommit(boolean z) {
                if (z) {
                    atomicBoolean.set(true);
                }
                super.postCommit(z);
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        })), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask.partitionsForOffsetReset, IsEqual.equalTo(this.taskId00Partitions));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.is(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldReviveCorruptTasksEvenIfTheyCannotCloseClean() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.3
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        })), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask.partitionsForOffsetReset, IsEqual.equalTo(this.taskId00Partitions));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.is(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldCommitNonCorruptedTasksOnTaskCorruptedException() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager);
        HashMap hashMap = new HashMap(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        expectRestoreToBeCompleted(this.consumer);
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        this.consumer.commitSync((Map) EasyMock.eq(Collections.emptyMap()));
        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not invoke commitSync when offset map is empty"));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), set -> {
            MatcherAssert.assertThat(set, Matchers.is(Matchers.empty()));
        })), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask2.setCommitNeeded();
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        Assert.assertTrue(stateMachineTask2.commitPrepared);
        MatcherAssert.assertThat(stateMachineTask2.partitionsForOffsetReset, IsEqual.equalTo(Collections.emptySet()));
        MatcherAssert.assertThat(stateMachineTask.partitionsForOffsetReset, IsEqual.equalTo(this.taskId00Partitions));
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldNotCommitNonRunningNonCorruptedTasks() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager);
        stateMachineTask2.setCommitNeeded();
        HashMap hashMap = new HashMap(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        EasyMock.expect(this.consumer.assignment()).andReturn(this.taskId00Partitions);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.partitionsForOffsetReset, IsEqual.equalTo(Collections.emptySet()));
        MatcherAssert.assertThat(stateMachineTask.partitionsForOffsetReset, IsEqual.equalTo(this.taskId00Partitions));
        Assert.assertFalse(stateMachineTask2.commitPrepared);
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldCleanAndReviveCorruptedStandbyTasksBeforeCommittingNonCorruptedTasks() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.4
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new TaskMigratedException("You dropped out of the group!", new RuntimeException());
            }
        };
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId01Assignment))).thenReturn(Collections.singleton(stateMachineTask2));
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singleton(stateMachineTask));
        expectRestoreToBeCompleted(this.consumer);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId01Assignment, this.taskId00Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask2.setCommitNeeded();
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        });
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldNotAttemptToCommitInHandleCorruptedDuringARebalance() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andStubReturn(new ArrayList());
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask2.setCommitNeeded();
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        expectRestoreToBeCompleted(this.consumer);
        EasyMock.expect(this.consumer.assignment()).andStubReturn(Utils.union(HashSet::new, new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        EasyMock.replay(new Object[]{this.consumer, this.stateDirectory});
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap);
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitCompleted), Matchers.is(false));
        this.taskManager.handleRebalanceStart(Collections.singleton(AssignmentTestUtils.TP_1_NAME));
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.rebalanceInProgress()), Matchers.is(true));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitCompleted), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitWithALOS() {
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.5
            public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
                Assert.fail("Should not try to mark changelogs as corrupted for uncorrupted task");
            }
        };
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap);
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        expectRestoreToBeCompleted(this.consumer);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new TimeoutException());
        EasyMock.expect(this.consumer.assignment()).andStubReturn(Utils.union(HashSet::new, new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask2.setCommitNeeded();
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0, 0L));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitCompleted), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitCompleted), Matchers.is(false));
        this.taskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitCompleted), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitCompleted), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00Partitions);
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringHandleCorruptedWithEOS() {
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        Mockito.when(this.activeTaskCreator.threadProducer()).thenReturn(streamsProducer);
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.6
            public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
                super.markChangelogAsCorrupted(collection);
                atomicBoolean.set(true);
            }
        };
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.7
            public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
                super.markChangelogAsCorrupted(collection);
                atomicBoolean2.set(true);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap);
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        expectRestoreToBeCompleted(this.consumer);
        ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect(this.consumer.groupMetadata()).andReturn(consumerGroupMetadata);
        ((StreamsProducer) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(streamsProducer)).commitTransaction(singletonMap, consumerGroupMetadata);
        EasyMock.expect(this.consumer.assignment()).andStubReturn(Utils.union(HashSet::new, new Set[]{this.taskId00Partitions, this.taskId01Partitions}));
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(upTaskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask2.setCommitNeeded();
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0changelog, 0L));
        stateMachineTask2.setChangelogOffsets(Collections.singletonMap(this.t1p1changelog, 0L));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitCompleted), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitCompleted), Matchers.is(false));
        upTaskManager.handleCorruption(Collections.singleton(this.taskId00));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitCompleted), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitCompleted), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(true));
        EasyMock.verify(new Object[]{this.consumer});
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00ChangelogPartitions);
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId01ChangelogPartitions);
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithALOS() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.8
            public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
                Assert.fail("Should not try to mark changelogs as corrupted for uncorrupted task");
            }
        };
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        stateMachineTask2.setCommitNeeded();
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        EasyMock.expectLastCall();
        this.consumer.commitSync(hashMap);
        EasyMock.expectLastCall().andThrow(new TimeoutException());
        EasyMock.expect(this.consumer.assignment()).andStubReturn(Utils.union(HashSet::new, new Set[]{this.taskId00Partitions, this.taskId01Partitions, this.taskId02Partitions}));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
    }

    @Test
    public void shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithEOS() {
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        Mockito.when(this.activeTaskCreator.threadProducer()).thenReturn(streamsProducer);
        ProcessorStateManager processorStateManager = (ProcessorStateManager) Mockito.mock(ProcessorStateManager.class);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true, processorStateManager);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true, processorStateManager) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.9
            public void markChangelogAsCorrupted(Collection<TopicPartition> collection) {
                super.markChangelogAsCorrupted(collection);
                atomicBoolean.set(true);
            }
        };
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        stateMachineTask2.setCommitNeeded();
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true, processorStateManager);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect(this.consumer.groupMetadata()).andReturn(consumerGroupMetadata);
        ((StreamsProducer) Mockito.doThrow(new Throwable[]{new TimeoutException()}).when(streamsProducer)).commitTransaction(hashMap, consumerGroupMetadata);
        EasyMock.expect(this.consumer.assignment()).andStubReturn(Utils.union(HashSet::new, new Set[]{this.taskId00Partitions, this.taskId01Partitions, this.taskId02Partitions}));
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(upTaskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setChangelogOffsets(Collections.singletonMap(this.t1p0changelog, 0L));
        stateMachineTask2.setChangelogOffsets(Collections.singletonMap(this.t1p1changelog, 0L));
        upTaskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId00ChangelogPartitions);
        ((ProcessorStateManager) Mockito.verify(processorStateManager)).markChangelogAsCorrupted(this.taskId01ChangelogPartitions);
    }

    @Test
    public void shouldCloseStandbyUnassignedTasksWhenCreatingNewTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singletonList(stateMachineTask));
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
    }

    @Test
    public void shouldAddNonResumedSuspendedTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(Collections.emptyMap()));
    }

    @Test
    public void shouldUpdateInputPartitionsAfterRebalance() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        expectRestoreToBeCompleted(this.consumer);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.t1p1});
        this.taskManager.handleAssignment(Collections.singletonMap(this.taskId00, mkSet), Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertEquals(mkSet, stateMachineTask.inputPartitions());
        EasyMock.verify(new Object[]{this.consumer});
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(Collections.emptyMap()));
    }

    @Test
    public void shouldAddNewActiveTasks() {
        Map<TaskId, Set<TopicPartition>> map = this.taskId00Assignment;
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(map))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(map, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), set -> {
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
    }

    @Test
    public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.10
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void initializeIfNeeded() {
                throw new LockException("can't lock");
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.11
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void initializeIfNeeded() {
                throw new TimeoutException("timed out");
            }
        };
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask), Utils.mkEntry(this.taskId01, stateMachineTask2)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
    }

    @Test
    public void shouldNotCompleteRestorationIfTaskCannotCompleteRestoration() {
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.12
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> consumer) {
                throw new TimeoutException("timeout!");
            }
        };
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        EasyMock.expectLastCall();
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
    }

    @Test
    public void shouldSuspendActiveTasksDuringRevocation() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
    }

    @Test
    public void shouldCommitAllActiveTasksThatNeedCommittingOnHandleRevocationWithEosV2() {
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        stateMachineTask2.setCommitNeeded();
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, (String) null)));
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId10, this.taskId10Partitions)});
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        Mockito.when(this.activeTaskCreator.threadProducer()).thenReturn(streamsProducer);
        Mockito.when(this.standbyTaskCreator.createTasks(mkMap2)).thenReturn(Collections.singletonList(stateMachineTask4));
        ConsumerGroupMetadata consumerGroupMetadata = new ConsumerGroupMetadata("appId");
        EasyMock.expect(this.consumer.groupMetadata()).andReturn(consumerGroupMetadata);
        streamsProducer.commitTransaction(hashMap, consumerGroupMetadata);
        EasyMock.expectLastCall();
        stateMachineTask.committedOffsets();
        EasyMock.expectLastCall();
        stateMachineTask2.committedOffsets();
        EasyMock.expectLastCall();
        stateMachineTask3.committedOffsets();
        EasyMock.expectLastCall();
        stateMachineTask4.committedOffsets();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(upTaskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        upTaskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask3.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask4.commitPrepared), Matchers.is(false));
    }

    @Test
    public void shouldCommitAllNeededTasksOnHandleRevocation() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        stateMachineTask2.setCommitNeeded();
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, (String) null)));
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId10, this.taskId10Partitions)});
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        Mockito.when(this.standbyTaskCreator.createTasks(mkMap2)).thenReturn(Collections.singletonList(stateMachineTask4));
        this.consumer.commitSync(hashMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask3.commitPrepared), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask4.commitPrepared), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfNoTaskClosed() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null)));
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        Map singletonMap = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map singletonMap2 = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(singletonMap))).thenReturn(Collections.singleton(stateMachineTask));
        Mockito.when(this.standbyTaskCreator.createTasks(singletonMap2)).thenReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(singletonMap, singletonMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(singletonMap, singletonMap2);
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitPrepared), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitOnHandleAssignmentIfOnlyStandbyTaskClosed() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null)));
        stateMachineTask.setCommitNeeded();
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        Map singletonMap = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        Map singletonMap2 = Collections.singletonMap(this.taskId10, this.taskId10Partitions);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(singletonMap))).thenReturn(Collections.singleton(stateMachineTask));
        Mockito.when(this.standbyTaskCreator.createTasks(singletonMap2)).thenReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(singletonMap, singletonMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        this.taskManager.handleAssignment(singletonMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(true));
    }

    @Test
    public void shouldNotCommitCreatedTasksOnRevocationOrClosure() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.handleRevocation(this.taskId00Partitions);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded((TaskId) EasyMock.eq(this.taskId00));
    }

    @Test
    public void shouldPassUpIfExceptionDuringSuspend() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.13
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleRevocation(this.taskId00Partitions);
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateExceptionsOnCleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions), Utils.mkEntry(this.taskId03, this.taskId03Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.14
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(topicPartition);
            }
        };
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        final AtomicBoolean atomicBoolean3 = new AtomicBoolean(false);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.15
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeDirty() {
                super.closeDirty();
                atomicBoolean.set(true);
            }
        };
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.16
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeDirty() {
                super.closeDirty();
                atomicBoolean2.set(true);
            }
        };
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.17
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }

            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void closeDirty() {
                super.closeDirty();
                atomicBoolean3.set(true);
            }
        };
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3, stateMachineTask4));
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask), Utils.mkEntry(this.taskId01, stateMachineTask2), Utils.mkEntry(this.taskId02, stateMachineTask3), Utils.mkEntry(this.taskId03, stateMachineTask4)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).completedChangelogs();
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.shutdown(true);
        })).getCause().getMessage(), Matchers.is("oops"));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean2.get()), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean3.get()), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator, Mockito.times(4))).closeAndRemoveTaskProducerIfNeeded((TaskId) ArgumentMatchers.any());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateTaskProducerExceptionsOnCleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.18
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(topicPartition);
            }
        };
        stateMachineTask.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null)));
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Collections.singletonList(stateMachineTask));
        ((ActiveTaskCreator) Mockito.doThrow(new Throwable[]{new RuntimeException("whatever")}).when(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).completedChangelogs();
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.shutdown(true);
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), Matchers.is("whatever"));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldCloseActiveTasksAndPropagateThreadProducerExceptionsOnCleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.19
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(topicPartition);
            }
        };
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Collections.singletonList(stateMachineTask));
        ((ActiveTaskCreator) Mockito.doThrow(new Throwable[]{new RuntimeException("whatever")}).when(this.activeTaskCreator)).closeThreadProducerIfNeeded();
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).completedChangelogs();
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.shutdown(true);
        });
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(runtimeException.getMessage(), Matchers.is("whatever"));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
    }

    @Test
    public void shouldOnlyCommitRevokedStandbyTaskAndPropagatePrepareCommitException() {
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, false);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.20
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("task 0_1 prepare commit boom!");
            }
        };
        stateMachineTask2.setCommitNeeded();
        this.taskManager.addTask(stateMachineTask);
        this.taskManager.addTask(stateMachineTask2);
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.singletonMap(this.taskId00, this.taskId00Partitions));
        })).getCause().getMessage(), Matchers.is("task 0_1 prepare commit boom!"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.allTasks(), Matchers.is(Collections.singletonMap(this.taskId00, stateMachineTask)));
    }

    @Test
    public void shouldSuspendAllRevokedActiveTasksAndPropagateSuspendException() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.21
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("task 0_1 suspend boom!");
            }
        };
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        this.taskManager.addTask(stateMachineTask);
        this.taskManager.addTask(stateMachineTask2);
        this.taskManager.addTask(stateMachineTask3);
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleRevocation(Utils.union(HashSet::new, new Set[]{this.taskId01Partitions, this.taskId02Partitions}));
        })).getCause().getMessage(), Matchers.is("task 0_1 suspend boom!"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.SUSPENDED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.SUSPENDED));
        Mockito.verifyNoInteractions(new Object[]{this.activeTaskCreator});
    }

    @Test
    public void shouldCloseActiveTasksAndIgnoreExceptionsOnUncleanShutdown() {
        final TopicPartition topicPartition = new TopicPartition("changelog", 0);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.22
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(topicPartition);
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.23
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }
        };
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.24
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("oops");
            }
        };
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        ((ActiveTaskCreator) Mockito.doThrow(new Throwable[]{new RuntimeException("whatever")}).when(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded((TaskId) Mockito.any());
        ((ActiveTaskCreator) Mockito.doThrow(new Throwable[]{new RuntimeException("whatever all")}).when(this.activeTaskCreator)).closeThreadProducerIfNeeded();
        this.taskManager.handleAssignment(mkMap, Collections.emptyMap());
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CREATED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, stateMachineTask), Utils.mkEntry(this.taskId01, stateMachineTask2), Utils.mkEntry(this.taskId02, stateMachineTask3)})));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).enforceRestoreActive();
        ((ChangelogReader) Mockito.verify(this.changeLogReader)).completedChangelogs();
        this.taskManager.shutdown(false);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator, Mockito.times(3))).closeAndRemoveTaskProducerIfNeeded((TaskId) ArgumentMatchers.any());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldCloseStandbyTasksOnShutdown() {
        Map singletonMap = Collections.singletonMap(this.taskId00, this.taskId00Partitions);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        Mockito.when(this.standbyTaskCreator.createTasks(singletonMap)).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.expect(this.consumer.assignment()).andReturn(Collections.emptySet());
        this.consumer.resume((Collection) EasyMock.eq(Collections.emptySet()));
        EasyMock.expectLastCall();
        this.consumer.commitSync(Collections.emptyMap());
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), singletonMap);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId00, stateMachineTask)));
        this.taskManager.shutdown(true);
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CLOSED));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
    }

    @Test
    public void shouldShutDownStateUpdaterAndCloseFailedTasksDirty() {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        Mockito.when(this.stateUpdater.drainExceptionsAndFailedTasks()).thenReturn(Arrays.asList(new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{task}), new RuntimeException()), new StateUpdater.ExceptionAndTasks(Utils.mkSet(new Task[]{(StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).build()}), new RuntimeException())));
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true).shutdown(true);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(task.id());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).shutdown(Duration.ofMillis(Long.MAX_VALUE));
        ((StreamTask) Mockito.verify(task)).prepareCommit();
        ((StreamTask) Mockito.verify(task)).suspend();
        ((StreamTask) Mockito.verify(task)).closeDirty();
    }

    @Test
    public void shouldShutDownStateUpdaterAndAddRestoredTasksToTaskRegistry() {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        StreamTask build = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        StreamTask build2 = StreamsTestUtils.TaskBuilder.statefulTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RESTORING).build();
        Set mkSet = Utils.mkSet(new StreamTask[]{build, build2});
        Set set = (Set) mkSet.stream().map(streamTask -> {
            return streamTask;
        }).collect(Collectors.toSet());
        Mockito.when(this.stateUpdater.drainRestoredActiveTasks(Duration.ZERO)).thenReturn(mkSet);
        Mockito.when(tasksRegistry.activeTasks()).thenReturn(set);
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true).shutdown(true);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build.id());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(build2.id());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).shutdown(Duration.ofMillis(Long.MAX_VALUE));
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addActiveTasks(set);
        ((StreamTask) Mockito.verify(build)).closeClean();
        ((StreamTask) Mockito.verify(build2)).closeClean();
    }

    @Test
    public void shouldShutDownStateUpdaterAndAddRemovedTasksToTaskRegistry() {
        TasksRegistry tasksRegistry = (TasksRegistry) Mockito.mock(TasksRegistry.class);
        Task task = (StreamTask) StreamsTestUtils.TaskBuilder.statefulTask(this.taskId01, this.taskId01ChangelogPartitions).inState(Task.State.RESTORING).build();
        Task task2 = (StandbyTask) StreamsTestUtils.TaskBuilder.standbyTask(this.taskId02, this.taskId02ChangelogPartitions).inState(Task.State.RUNNING).build();
        Mockito.when(this.stateUpdater.drainRemovedTasks()).thenReturn(Utils.mkSet(new Task[]{task2, task}));
        Mockito.when(tasksRegistry.activeTasks()).thenReturn(Utils.mkSet(new Task[]{task}));
        Mockito.when(tasksRegistry.allTasks()).thenReturn(Utils.mkSet(new Task[]{task, task2}));
        setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasksRegistry, true).shutdown(true);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(task.id());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeThreadProducerIfNeeded();
        ((StateUpdater) Mockito.verify(this.stateUpdater)).shutdown(Duration.ofMillis(Long.MAX_VALUE));
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addActiveTasks(Utils.mkSet(new Task[]{task}));
        ((TasksRegistry) Mockito.verify(tasksRegistry)).addStandbyTasks(Utils.mkSet(new Task[]{task2}));
        ((StreamTask) Mockito.verify(task)).closeClean();
        ((StandbyTask) Mockito.verify(task2)).closeClean();
    }

    @Test
    public void shouldInitializeNewActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId00, stateMachineTask)));
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.anEmptyMap());
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldInitialiseNewStandbyTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(this.taskManager.activeTaskMap(), Matchers.anEmptyMap());
        MatcherAssert.assertThat(this.taskManager.standbyTaskMap(), Matchers.equalTo(Collections.singletonMap(this.taskId01, stateMachineTask)));
    }

    @Test
    public void shouldHandleRebalanceEvents() {
        Set singleton = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect(this.consumer.assignment()).andReturn(singleton);
        this.consumer.pause(singleton);
        EasyMock.expectLastCall();
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(new ArrayList());
        EasyMock.replay(new Object[]{this.consumer, this.stateDirectory});
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.rebalanceInProgress()), Matchers.is(false));
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.rebalanceInProgress()), Matchers.is(true));
        this.taskManager.handleRebalanceComplete();
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.rebalanceInProgress()), Matchers.is(false));
    }

    @Test
    public void shouldCommitActiveAndStandbyTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(stateMachineTask2));
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(false));
    }

    @Test
    public void shouldCommitProvidedTasksIfNeeded() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId03, this.taskId03Partitions, false);
        StateMachineTask stateMachineTask5 = new StateMachineTask(this.taskId04, this.taskId04Partitions, false);
        StateMachineTask stateMachineTask6 = new StateMachineTask(this.taskId05, this.taskId05Partitions, false);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId03, this.taskId03Partitions), Utils.mkEntry(this.taskId04, this.taskId04Partitions), Utils.mkEntry(this.taskId05, this.taskId05Partitions)});
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3));
        Mockito.when(this.standbyTaskCreator.createTasks(mkMap2)).thenReturn(Arrays.asList(stateMachineTask4, stateMachineTask5, stateMachineTask6));
        this.consumer.commitSync((Map) EasyMock.eq(Collections.emptyMap()));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        stateMachineTask4.setCommitNeeded();
        stateMachineTask5.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commit(Utils.mkSet(new Task[]{stateMachineTask, stateMachineTask3, stateMachineTask4, stateMachineTask6}))), IsEqual.equalTo(2));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask2.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask3.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask4.commitNeeded), Matchers.is(false));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask5.commitNeeded), Matchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask6.commitNeeded), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitOffsetsIfOnlyStandbyTasksAssigned() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, false);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(1));
        MatcherAssert.assertThat(Boolean.valueOf(stateMachineTask.commitNeeded), Matchers.is(false));
    }

    @Test
    public void shouldNotCommitActiveAndStandbyTasksWhileRebalanceInProgress() throws Exception {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, false);
        makeTaskFolders(this.taskId00.toString(), stateMachineTask2.toString());
        expectLockObtainedFor(this.taskId00, this.taskId01);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(stateMachineTask2));
        EasyMock.replay(new Object[]{this.stateDirectory, this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        this.taskManager.handleRebalanceStart(Collections.emptySet());
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commitAll()), IsEqual.equalTo(-1));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.maybeCommitActiveTasksPerUserRequested()), IsEqual.equalTo(-1));
    }

    @Test
    public void shouldCommitViaConsumerIfEosDisabled() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.addTask(stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.commitAll();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldCommitViaProducerIfEosAlphaEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        Mockito.when(this.activeTaskCreator.streamsProducerForTask((TaskId) ArgumentMatchers.any(TaskId.class))).thenReturn(streamsProducer);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, (String) null));
        shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, singletonMap, singletonMap2);
        ((StreamsProducer) Mockito.verify(streamsProducer)).commitTransaction(singletonMap, new ConsumerGroupMetadata("appId"));
        ((StreamsProducer) Mockito.verify(streamsProducer)).commitTransaction(singletonMap2, new ConsumerGroupMetadata("appId"));
        Mockito.verifyNoMoreInteractions(new Object[]{streamsProducer});
    }

    @Test
    public void shouldCommitViaProducerIfEosV2Enabled() {
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        Mockito.when(this.activeTaskCreator.threadProducer()).thenReturn(streamsProducer);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(0L, (String) null));
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p2, new OffsetAndMetadata(1L, (String) null));
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, singletonMap, singletonMap2);
        ((StreamsProducer) Mockito.verify(streamsProducer)).commitTransaction(hashMap, new ConsumerGroupMetadata("appId"));
        Mockito.verifyNoMoreInteractions(new Object[]{streamsProducer});
    }

    private void shouldCommitViaProducerIfEosEnabled(StreamsConfigUtils.ProcessingMode processingMode, Map<TopicPartition, OffsetAndMetadata> map, Map<TopicPartition, OffsetAndMetadata> map2) {
        TaskManager upTaskManager = setUpTaskManager(processingMode, false);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(map);
        stateMachineTask.setCommitNeeded();
        upTaskManager.addTask(stateMachineTask);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask2.setCommittableOffsetsAndMetadata(map2);
        stateMachineTask2.setCommitNeeded();
        upTaskManager.addTask(stateMachineTask2);
        EasyMock.reset(new Object[]{this.consumer});
        EasyMock.expect(this.consumer.groupMetadata()).andStubReturn(new ConsumerGroupMetadata("appId"));
        EasyMock.replay(new Object[]{this.consumer});
        upTaskManager.commitAll();
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldPropagateExceptionFromActiveCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.25
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.commitAll();
        })).getMessage(), IsEqual.equalTo("opsh."));
    }

    @Test
    public void shouldPropagateExceptionFromStandbyCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.26
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
                throw new RuntimeException("opsh.");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId01Assignment)).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId01Assignment);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.commitAll();
        })).getMessage(), IsEqual.equalTo("opsh."));
    }

    @Test
    public void shouldSendPurgeData() {
        Mockito.when(this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L)))).thenReturn(new DeleteRecordsResult(Collections.singletonMap(this.t1p1, completedFuture())));
        Mockito.when(this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(17L)))).thenReturn(new DeleteRecordsResult(Collections.singletonMap(this.t1p1, completedFuture())));
        InOrder inOrder = Mockito.inOrder(new Object[]{this.adminClient});
        final HashMap hashMap = new HashMap();
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.27
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, Long> purgeableOffsets() {
                return hashMap;
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        hashMap.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        hashMap.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
        ((Admin) inOrder.verify(this.adminClient)).deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L)));
        ((Admin) inOrder.verify(this.adminClient)).deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(17L)));
        inOrder.verifyNoMoreInteractions();
    }

    @Test
    public void shouldNotSendPurgeDataIfPreviousNotDone() {
        Mockito.when(this.adminClient.deleteRecords(Collections.singletonMap(this.t1p1, RecordsToDelete.beforeOffset(5L)))).thenReturn(new DeleteRecordsResult(Collections.singletonMap(this.t1p1, new KafkaFutureImpl())));
        final HashMap hashMap = new HashMap();
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.28
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Map<TopicPartition, Long> purgeableOffsets() {
                return hashMap;
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        hashMap.put(this.t1p1, 5L);
        this.taskManager.maybePurgeCommittedRecords();
        hashMap.put(this.t1p1, 17L);
        this.taskManager.maybePurgeCommittedRecords();
    }

    @Test
    public void shouldIgnorePurgeDataErrors() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        expectRestoreToBeCompleted(this.consumer);
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(this.t1p1, kafkaFutureImpl));
        kafkaFutureImpl.completeExceptionally(new Exception("KABOOM!"));
        Mockito.when(this.adminClient.deleteRecords((Map) ArgumentMatchers.any())).thenReturn(deleteRecordsResult);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.addTask(stateMachineTask);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setPurgeableOffsets(Collections.singletonMap(this.t1p1, 5L));
        this.taskManager.maybePurgeCommittedRecords();
        this.taskManager.maybePurgeCommittedRecords();
    }

    @Test
    public void shouldMaybeCommitAllActiveTasksThatNeedCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.setCommittableOffsetsAndMetadata(Collections.singletonMap(this.t1p2, new OffsetAndMetadata(2L, (String) null)));
        StateMachineTask stateMachineTask4 = new StateMachineTask(this.taskId03, this.taskId03Partitions, true);
        StateMachineTask stateMachineTask5 = new StateMachineTask(this.taskId10, this.taskId10Partitions, false);
        HashMap hashMap = new HashMap();
        hashMap.putAll(singletonMap);
        hashMap.putAll(singletonMap2);
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId00, this.taskId00Partitions), Utils.mkEntry(this.taskId01, this.taskId01Partitions), Utils.mkEntry(this.taskId02, this.taskId02Partitions), Utils.mkEntry(this.taskId03, this.taskId03Partitions)});
        Map mkMap2 = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(this.taskId10, this.taskId10Partitions)});
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(mkMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2, stateMachineTask3, stateMachineTask4));
        Mockito.when(this.standbyTaskCreator.createTasks(mkMap2)).thenReturn(Collections.singletonList(stateMachineTask5));
        this.consumer.commitSync(hashMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(mkMap, mkMap2);
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask3.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask4.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask5.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.setCommitNeeded();
        stateMachineTask.setCommitRequested();
        stateMachineTask2.setCommitNeeded();
        stateMachineTask3.setCommitRequested();
        stateMachineTask4.setCommitNeeded();
        stateMachineTask4.setCommitRequested();
        stateMachineTask5.setCommitNeeded();
        stateMachineTask5.setCommitRequested();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.maybeCommitActiveTasksPerUserRequested()), IsEqual.equalTo(3));
    }

    @Test
    public void shouldProcessActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        HashMap hashMap = new HashMap();
        hashMap.put(this.taskId00, this.taskId00Partitions);
        hashMap.put(this.taskId01, this.taskId01Partitions);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.RUNNING));
        stateMachineTask.addRecords(this.t1p0, Arrays.asList(getConsumerRecord(this.t1p0, 0L), getConsumerRecord(this.t1p0, 1L), getConsumerRecord(this.t1p0, 2L), getConsumerRecord(this.t1p0, 3L), getConsumerRecord(this.t1p0, 4L), getConsumerRecord(this.t1p0, 5L)));
        stateMachineTask2.addRecords(this.t1p1, Arrays.asList(getConsumerRecord(this.t1p1, 0L), getConsumerRecord(this.t1p1, 1L), getConsumerRecord(this.t1p1, 2L), getConsumerRecord(this.t1p1, 3L), getConsumerRecord(this.t1p1, 4L)));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(3, this.time)), Matchers.is(6));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(3, this.time)), Matchers.is(5));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(3, this.time)), Matchers.is(0));
    }

    @Test
    public void shouldNotFailOnTimeoutException() {
        final AtomicReference atomicReference = new AtomicReference();
        atomicReference.set(new TimeoutException("Skip me!"));
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.transitionTo(Task.State.RESTORING);
        stateMachineTask.transitionTo(Task.State.RUNNING);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.29
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public boolean process(long j) {
                TimeoutException timeoutException = (TimeoutException) atomicReference.get();
                if (timeoutException != null) {
                    throw timeoutException;
                }
                return true;
            }
        };
        stateMachineTask2.transitionTo(Task.State.RESTORING);
        stateMachineTask2.transitionTo(Task.State.RUNNING);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        stateMachineTask3.transitionTo(Task.State.RESTORING);
        stateMachineTask3.transitionTo(Task.State.RUNNING);
        this.taskManager.addTask(stateMachineTask);
        this.taskManager.addTask(stateMachineTask2);
        this.taskManager.addTask(stateMachineTask3);
        stateMachineTask.addRecords(this.t1p0, Arrays.asList(getConsumerRecord(this.t1p0, 0L), getConsumerRecord(this.t1p0, 1L)));
        stateMachineTask2.addRecords(this.t1p1, Arrays.asList(getConsumerRecord(this.t1p1, 0L), getConsumerRecord(this.t1p1, 1L)));
        stateMachineTask3.addRecords(this.t1p2, Arrays.asList(getConsumerRecord(this.t1p2, 0L), getConsumerRecord(this.t1p2, 1L)));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(1, this.time)), Matchers.is(2));
        MatcherAssert.assertThat(stateMachineTask2.timeout, IsEqual.equalTo(Long.valueOf(this.time.milliseconds())));
        atomicReference.set(null);
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(1, this.time)), Matchers.is(3));
        MatcherAssert.assertThat(stateMachineTask2.timeout, IsEqual.equalTo((Object) null));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.process(1, this.time)), Matchers.is(1));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInProcessActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.30
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public boolean process(long j) {
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        TopicPartition next = this.taskId00Partitions.iterator().next();
        stateMachineTask.addRecords(next, Collections.singletonList(getConsumerRecord(next, 0L)));
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.process(1, this.time);
        });
    }

    @Test
    public void shouldWrapRuntimeExceptionsInProcessActiveTasksAndSetTaskId() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.31
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public boolean process(long j) {
                throw new RuntimeException("oops");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        TopicPartition next = this.taskId00Partitions.iterator().next();
        stateMachineTask.addRecords(next, Collections.singletonList(getConsumerRecord(next, 0L)));
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.taskManager.process(1, this.time);
        });
        MatcherAssert.assertThat(Boolean.valueOf(assertThrows.taskId().isPresent()), Matchers.is(true));
        MatcherAssert.assertThat(assertThrows.taskId().get(), Matchers.is(this.taskId00));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.is("oops"));
    }

    @Test
    public void shouldPropagateTaskMigratedExceptionsInPunctuateActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.32
            public boolean maybePunctuateStreamTime() {
                throw new TaskMigratedException("migrated", new RuntimeException("cause"));
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.punctuate();
        });
    }

    @Test
    public void shouldPropagateKafkaExceptionsInPunctuateActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.33
            public boolean maybePunctuateStreamTime() {
                throw new KafkaException("oops");
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        Assert.assertThrows(KafkaException.class, () -> {
            this.taskManager.punctuate();
        });
    }

    @Test
    public void shouldPunctuateActiveTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.34
            public boolean maybePunctuateStreamTime() {
                return true;
            }

            public boolean maybePunctuateSystemTime() {
                return true;
            }
        };
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.punctuate()), IsEqual.equalTo(2));
    }

    @Test
    public void shouldReturnFalseWhenThereAreStillNonRunningTasks() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.35
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public Set<TopicPartition> changelogPartitions() {
                return Collections.singleton(new TopicPartition("fake", 0));
            }
        };
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(false));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RESTORING));
        EasyMock.verify(new Object[]{this.consumer});
    }

    @Test
    public void shouldHaveRemainingPartitionsUncleared() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        expectRestoreToBeCompleted(this.consumer);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(stateMachineTask));
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(TaskManager.class);
        Throwable th = null;
        try {
            LogCaptureAppender.setClassLoggerToDebug(TaskManager.class);
            this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
            MatcherAssert.assertThat(Boolean.valueOf(this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null)), Matchers.is(true));
            MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.RUNNING));
            this.taskManager.handleRevocation(Utils.mkSet(new TopicPartition[]{this.t1p0, new TopicPartition("unknown", 0)}));
            MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
            MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("taskManagerTestThe following revoked partitions [unknown-0] are missing from the current task partitions. It could potentially be due to race condition of consumer detecting the heartbeat failure, or the tasks have been cleaned up by the handleAssignment callback."));
            if (createAndRegister != null) {
                if (0 == 0) {
                    createAndRegister.close();
                    return;
                }
                try {
                    createAndRegister.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowTaskMigratedWhenAllTaskCloseExceptionsAreTaskMigrated() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.36
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", new RuntimeException());
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.37
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t2 close exception", new RuntimeException());
            }
        };
        this.taskManager.addTask(stateMachineTask);
        this.taskManager.addTask(stateMachineTask2);
        MatcherAssert.assertThat(Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        }).getMessage(), IsEqual.equalTo("t2 close exception; it means all tasks belonging to this thread should be migrated."));
    }

    @Test
    public void shouldThrowRuntimeExceptionWhenEncounteredUnknownExceptionDuringTaskClose() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.38
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", new RuntimeException());
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.39
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new IllegalStateException("t2 illegal state exception", new RuntimeException());
            }
        };
        this.taskManager.addTask(stateMachineTask);
        this.taskManager.addTask(stateMachineTask2);
        RuntimeException runtimeException = (RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(runtimeException.getMessage(), IsEqual.equalTo("Encounter unexpected fatal error for task 0_2"));
        MatcherAssert.assertThat(runtimeException.getCause().getMessage(), IsEqual.equalTo("t2 illegal state exception"));
    }

    @Test
    public void shouldThrowSameKafkaExceptionWhenEncounteredDuringTaskClose() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.40
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new TaskMigratedException("t1 close exception", new RuntimeException());
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId02, this.taskId02Partitions, false) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.41
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new KafkaException("Kaboom for t2!", new RuntimeException());
            }
        };
        this.taskManager.addTask(stateMachineTask);
        this.taskManager.addTask(stateMachineTask2);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.taskManager.handleAssignment(Collections.emptyMap(), Collections.emptyMap());
        });
        MatcherAssert.assertThat(Boolean.valueOf(assertThrows.taskId().isPresent()), Matchers.is(true));
        MatcherAssert.assertThat(assertThrows.taskId().get(), Matchers.is(this.taskId02));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), IsEqual.equalTo("Kaboom for t2!"));
    }

    @Test
    public void shouldTransmitProducerMetrics() {
        MetricName metricName = new MetricName("test_metric", "", "", new HashMap());
        Map singletonMap = Collections.singletonMap(metricName, new KafkaMetric(new Object(), metricName, (metricConfig, j) -> {
            return 0.0d;
        }, (MetricConfig) null, new MockTime()));
        Mockito.when(this.activeTaskCreator.producerMetrics()).thenReturn(singletonMap);
        MatcherAssert.assertThat(this.taskManager.producerMetrics(), Matchers.is(singletonMap));
    }

    private Map<TaskId, StateMachineTask> handleAssignment(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2, Map<TaskId, Set<TopicPartition>> map3) {
        Set<Task> set = (Set) map.entrySet().stream().map(entry -> {
            return new StateMachineTask((TaskId) entry.getKey(), (Set) entry.getValue(), true);
        }).collect(Collectors.toSet());
        Set<Task> set2 = (Set) map2.entrySet().stream().map(entry2 -> {
            return new StateMachineTask((TaskId) entry2.getKey(), (Set) entry2.getValue(), false);
        }).collect(Collectors.toSet());
        Set<Task> set3 = (Set) map3.entrySet().stream().map(entry3 -> {
            return new StateMachineTask((TaskId) entry3.getKey(), (Set) entry3.getValue(), true);
        }).collect(Collectors.toSet());
        set3.forEach(task -> {
            ((StateMachineTask) task).setChangelogOffsets(Collections.singletonMap(new TopicPartition("changelog", 0), 0L));
        });
        HashMap hashMap = new HashMap(map);
        hashMap.putAll(map3);
        HashSet hashSet = new HashSet(set);
        hashSet.addAll(set3);
        Mockito.when(this.standbyTaskCreator.createTasks(map2)).thenReturn(set2);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(hashSet);
        expectRestoreToBeCompleted(this.consumer);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(hashMap, map2);
        this.taskManager.tryToCompleteRestoration(this.time.milliseconds(), (java.util.function.Consumer) null);
        HashMap hashMap2 = new HashMap();
        for (Task task2 : set) {
            MatcherAssert.assertThat(task2.state(), Matchers.is(Task.State.RUNNING));
            hashMap2.put(task2.id(), (StateMachineTask) task2);
        }
        for (Task task3 : set3) {
            MatcherAssert.assertThat(task3.state(), Matchers.is(Task.State.RESTORING));
            hashMap2.put(task3.id(), (StateMachineTask) task3);
        }
        for (Task task4 : set2) {
            MatcherAssert.assertThat(task4.state(), Matchers.is(Task.State.RUNNING));
            hashMap2.put(task4.id(), (StateMachineTask) task4);
        }
        return hashMap2;
    }

    private void expectLockObtainedFor(TaskId... taskIdArr) throws Exception {
        for (TaskId taskId : taskIdArr) {
            EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(taskId))).andReturn(true).once();
        }
    }

    private void expectLockFailedFor(TaskId... taskIdArr) throws Exception {
        for (TaskId taskId : taskIdArr) {
            EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(taskId))).andReturn(false).once();
        }
    }

    private void expectUnlockFor(TaskId... taskIdArr) throws Exception {
        for (TaskId taskId : taskIdArr) {
            this.stateDirectory.unlock(taskId);
            EasyMock.expectLastCall();
        }
    }

    private static void expectConsumerAssignmentPaused(Consumer<byte[], byte[]> consumer) {
        Set singleton = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect(consumer.assignment()).andReturn(singleton);
        consumer.pause(singleton);
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnCommitFailed() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.addTask(stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new CommitFailedException());
        EasyMock.replay(new Object[]{this.consumer});
        TaskMigratedException assertThrows = Assert.assertThrows(TaskMigratedException.class, () -> {
            this.taskManager.commitAll();
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(CommitFailedException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Consumer committing offsets failed, indicating the corresponding thread is no longer part of the group; it means all tasks belonging to this thread should be migrated."));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldNotFailForTimeoutExceptionOnConsumerCommit() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata((Map) this.taskId00Partitions.stream().collect(Collectors.toMap(topicPartition -> {
            return topicPartition;
        }, topicPartition2 -> {
            return new OffsetAndMetadata(0L);
        })));
        stateMachineTask2.setCommittableOffsetsAndMetadata((Map) this.taskId00Partitions.stream().collect(Collectors.toMap(topicPartition3 -> {
            return topicPartition3;
        }, topicPartition4 -> {
            return new OffsetAndMetadata(0L);
        })));
        this.consumer.commitSync((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall().andThrow(new TimeoutException("KABOOM!"));
        this.consumer.commitSync((Map) EasyMock.anyObject(Map.class));
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.consumer});
        stateMachineTask.setCommitNeeded();
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commit(Utils.mkSet(new Task[]{stateMachineTask, stateMachineTask2}))), IsEqual.equalTo(0));
        MatcherAssert.assertThat(stateMachineTask.timeout, IsEqual.equalTo(Long.valueOf(this.time.milliseconds())));
        Assert.assertNull(stateMachineTask2.timeout);
        MatcherAssert.assertThat(Integer.valueOf(this.taskManager.commit(Utils.mkSet(new Task[]{stateMachineTask, stateMachineTask2}))), IsEqual.equalTo(1));
        Assert.assertNull(stateMachineTask.timeout);
        Assert.assertNull(stateMachineTask2.timeout);
    }

    @Test
    public void shouldNotFailForTimeoutExceptionOnCommitWithEosAlpha() {
        Tasks tasks = (Tasks) Mockito.mock(Tasks.class);
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_ALPHA, tasks, false);
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        Mockito.when(this.activeTaskCreator.streamsProducerForTask((TaskId) ArgumentMatchers.any(TaskId.class))).thenReturn(streamsProducer);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        ((StreamsProducer) Mockito.doThrow(new Throwable[]{new TimeoutException("KABOOM!")}).doNothing().doNothing().doNothing().when(streamsProducer)).commitTransaction(singletonMap, (ConsumerGroupMetadata) null);
        ((StreamsProducer) Mockito.doNothing().doNothing().when(streamsProducer)).commitTransaction(singletonMap2, (ConsumerGroupMetadata) null);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        Mockito.when(tasks.allTasks()).thenReturn(Utils.mkSet(new Task[]{stateMachineTask, stateMachineTask2, stateMachineTask3}));
        EasyMock.expect(this.consumer.groupMetadata()).andStubReturn((Object) null);
        EasyMock.replay(new Object[]{this.consumer});
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        MatcherAssert.assertThat(Assert.assertThrows(TaskCorruptedException.class, () -> {
            upTaskManager.commit(Utils.mkSet(new Task[]{stateMachineTask, stateMachineTask2, stateMachineTask3}));
        }).corruptedTasks(), IsEqual.equalTo(Collections.singleton(this.taskId00)));
    }

    @Test
    public void shouldThrowTaskCorruptedExceptionForTimeoutExceptionOnCommitWithEosV2() {
        TaskManager upTaskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2, false);
        StreamsProducer streamsProducer = (StreamsProducer) Mockito.mock(StreamsProducer.class);
        Mockito.when(this.activeTaskCreator.threadProducer()).thenReturn(streamsProducer);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        Map<TopicPartition, OffsetAndMetadata> singletonMap2 = Collections.singletonMap(this.t1p1, new OffsetAndMetadata(1L, (String) null));
        HashMap hashMap = new HashMap(singletonMap);
        hashMap.putAll(singletonMap2);
        ((StreamsProducer) Mockito.doThrow(new Throwable[]{new TimeoutException("KABOOM!")}).doNothing().when(streamsProducer)).commitTransaction(hashMap, (ConsumerGroupMetadata) null);
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true);
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        stateMachineTask2.setCommittableOffsetsAndMetadata(singletonMap2);
        StateMachineTask stateMachineTask3 = new StateMachineTask(this.taskId02, this.taskId02Partitions, true);
        EasyMock.expect(this.consumer.groupMetadata()).andStubReturn((Object) null);
        EasyMock.replay(new Object[]{this.consumer});
        stateMachineTask.setCommitNeeded();
        stateMachineTask2.setCommitNeeded();
        MatcherAssert.assertThat(Assert.assertThrows(TaskCorruptedException.class, () -> {
            upTaskManager.commit(Utils.mkSet(new Task[]{stateMachineTask, stateMachineTask2, stateMachineTask3}));
        }).corruptedTasks(), IsEqual.equalTo(Utils.mkSet(new TaskId[]{this.taskId00, this.taskId01})));
    }

    @Test
    public void shouldStreamsExceptionOnCommitError() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.addTask(stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new KafkaException());
        EasyMock.replay(new Object[]{this.consumer});
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.taskManager.commitAll();
        });
        MatcherAssert.assertThat(assertThrows.getCause(), Matchers.instanceOf(KafkaException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered committing offsets via consumer"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldFailOnCommitFatal() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(this.t1p0, new OffsetAndMetadata(0L, (String) null));
        stateMachineTask.setCommittableOffsetsAndMetadata(singletonMap);
        stateMachineTask.setCommitNeeded();
        this.taskManager.addTask(stateMachineTask);
        this.consumer.commitSync(singletonMap);
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM"));
        EasyMock.replay(new Object[]{this.consumer});
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.commitAll();
        })).getMessage(), IsEqual.equalTo("KABOOM"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.CREATED));
    }

    @Test
    public void shouldSuspendAllTasksButSkipCommitIfSuspendingFailsDuringRevocation() {
        StateMachineTask stateMachineTask = new StateMachineTask(this.taskId00, this.taskId00Partitions, true) { // from class: org.apache.kafka.streams.processor.internals.TaskManagerTest.42
            @Override // org.apache.kafka.streams.processor.internals.TaskManagerTest.StateMachineTask
            public void suspend() {
                super.suspend();
                throw new RuntimeException("KABOOM!");
            }
        };
        StateMachineTask stateMachineTask2 = new StateMachineTask(this.taskId01, this.taskId01Partitions, true);
        HashMap hashMap = new HashMap(this.taskId00Assignment);
        hashMap.putAll(this.taskId01Assignment);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(hashMap))).thenReturn(Arrays.asList(stateMachineTask, stateMachineTask2));
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(hashMap, Collections.emptyMap());
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(RuntimeException.class, () -> {
            this.taskManager.handleRevocation(Arrays.asList(this.t1p0, this.t1p1));
        })).getCause().getMessage(), Matchers.is("KABOOM!"));
        MatcherAssert.assertThat(stateMachineTask.state(), Matchers.is(Task.State.SUSPENDED));
        MatcherAssert.assertThat(stateMachineTask2.state(), Matchers.is(Task.State.SUSPENDED));
    }

    @Test
    public void shouldConvertActiveTaskToStandbyTask() {
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.id()).thenReturn(this.taskId00);
        Mockito.when(streamTask.inputPartitions()).thenReturn(this.taskId00Partitions);
        Mockito.when(Boolean.valueOf(streamTask.isActive())).thenReturn(true);
        StandbyTask standbyTask = (StandbyTask) Mockito.mock(StandbyTask.class);
        Mockito.when(standbyTask.id()).thenReturn(this.taskId00);
        Mockito.when(this.activeTaskCreator.createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(this.taskId00Assignment))).thenReturn(Collections.singletonList(streamTask));
        Mockito.when(this.standbyTaskCreator.createStandbyTaskFromActive((StreamTask) Mockito.any(), (Set) Mockito.eq(this.taskId00Partitions))).thenReturn(standbyTask);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).closeAndRemoveTaskProducerIfNeeded(this.taskId00);
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator)).createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(Collections.emptyMap()));
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator, Mockito.times(2))).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldConvertStandbyTaskToActiveTask() {
        StandbyTask standbyTask = (StandbyTask) Mockito.mock(StandbyTask.class);
        Mockito.when(standbyTask.id()).thenReturn(this.taskId00);
        Mockito.when(Boolean.valueOf(standbyTask.isActive())).thenReturn(false);
        Mockito.when(standbyTask.prepareCommit()).thenReturn(Collections.emptyMap());
        StreamTask streamTask = (StreamTask) Mockito.mock(StreamTask.class);
        Mockito.when(streamTask.id()).thenReturn(this.taskId00);
        Mockito.when(streamTask.inputPartitions()).thenReturn(this.taskId00Partitions);
        Mockito.when(this.standbyTaskCreator.createTasks(this.taskId00Assignment)).thenReturn(Collections.singletonList(standbyTask));
        Mockito.when(this.activeTaskCreator.createActiveTaskFromStandby((StandbyTask) Mockito.eq(standbyTask), (Set) Mockito.eq(this.taskId00Partitions), (Consumer) ArgumentMatchers.any())).thenReturn(streamTask);
        EasyMock.replay(new Object[]{this.consumer});
        this.taskManager.handleAssignment(Collections.emptyMap(), this.taskId00Assignment);
        this.taskManager.handleAssignment(this.taskId00Assignment, Collections.emptyMap());
        ((ActiveTaskCreator) Mockito.verify(this.activeTaskCreator, Mockito.times(2))).createTasks((Consumer) ArgumentMatchers.any(), (Map) Mockito.eq(Collections.emptyMap()));
        ((StandbyTaskCreator) Mockito.verify(this.standbyTaskCreator)).createTasks(Collections.emptyMap());
    }

    @Test
    public void shouldListNotPausedTasks() {
        handleAssignment(this.taskId00Assignment, this.taskId01Assignment, Collections.emptyMap());
        Assert.assertEquals(this.taskManager.notPausedTasks().size(), 2L);
        this.topologyMetadata.pauseTopology("__UNNAMED_TOPOLOGY__");
        Assert.assertEquals(this.taskManager.notPausedTasks().size(), 0L);
    }

    private static void expectRestoreToBeCompleted(Consumer<byte[], byte[]> consumer) {
        Set singleton = Collections.singleton(new TopicPartition("assignment", 0));
        EasyMock.expect(consumer.assignment()).andReturn(singleton);
        consumer.resume(singleton);
        EasyMock.expectLastCall();
    }

    private static KafkaFutureImpl<DeletedRecords> completedFuture() {
        KafkaFutureImpl<DeletedRecords> kafkaFutureImpl = new KafkaFutureImpl<>();
        kafkaFutureImpl.complete((Object) null);
        return kafkaFutureImpl;
    }

    private void makeTaskFolders(String... strArr) throws Exception {
        ArrayList arrayList = new ArrayList(strArr.length);
        for (String str : strArr) {
            arrayList.add(new StateDirectory.TaskDirectory(this.testFolder.newFolder(str), (String) null));
        }
        EasyMock.expect(this.stateDirectory.listNonEmptyTaskDirectories()).andReturn(arrayList).once();
    }

    private void writeCheckpointFile(TaskId taskId, Map<TopicPartition, Long> map) throws Exception {
        File checkpointFile = getCheckpointFile(taskId);
        Files.createFile(checkpointFile.toPath(), new FileAttribute[0]);
        new OffsetCheckpoint(checkpointFile).write(map);
        EasyMock.expect(this.stateDirectory.checkpointFileFor(taskId)).andReturn(checkpointFile);
    }

    private File getCheckpointFile(TaskId taskId) {
        return new File(new File(this.testFolder.getRoot(), taskId.toString()), ".checkpoint");
    }

    private static ConsumerRecord<byte[], byte[]> getConsumerRecord(TopicPartition topicPartition, long j) {
        return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), j, (Object) null, (Object) null);
    }
}
