package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.TaskId;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AssignedTasksTest.class */
public class AssignedTasksTest {
    private final Task t1 = (Task) EasyMock.createMock(Task.class);
    private final Task t2 = (Task) EasyMock.createMock(Task.class);
    private final TopicPartition tp1 = new TopicPartition("t1", 0);
    private final TopicPartition tp2 = new TopicPartition("t2", 0);
    private final TopicPartition changeLog1 = new TopicPartition("cl1", 0);
    private final TopicPartition changeLog2 = new TopicPartition("cl2", 0);
    private final TaskId taskId1 = new TaskId(0, 0);
    private final TaskId taskId2 = new TaskId(1, 0);
    private AssignedTasks assignedTasks;

    @Before
    public void before() {
        this.assignedTasks = new AssignedTasks(new LogContext("log "), "task");
        EasyMock.expect(this.t1.id()).andReturn(this.taskId1).anyTimes();
        EasyMock.expect(this.t2.id()).andReturn(this.taskId2).anyTimes();
    }

    @Test
    public void shouldGetPartitionsFromNewTasksThatHaveStateStores() {
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t2.hasStateStores())).andReturn(true);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1));
        EasyMock.expect(this.t2.partitions()).andReturn(Collections.singleton(this.tp2));
        EasyMock.replay(new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        MatcherAssert.assertThat(this.assignedTasks.uninitializedPartitions(), IsEqual.equalTo(Utils.mkSet(new TopicPartition[]{this.tp1, this.tp2})));
        EasyMock.verify(new Object[]{this.t1, this.t2});
    }

    @Test
    public void shouldNotGetPartitionsFromNewTasksWithoutStateStores() {
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(false);
        EasyMock.expect(Boolean.valueOf(this.t2.hasStateStores())).andReturn(false);
        EasyMock.replay(new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        Assert.assertTrue(this.assignedTasks.uninitializedPartitions().isEmpty());
        EasyMock.verify(new Object[]{this.t1, this.t2});
    }

    @Test
    public void shouldInitializeNewTasks() {
        EasyMock.expect(Boolean.valueOf(this.t1.initialize())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1));
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
        EasyMock.expect(Boolean.valueOf(this.t1.initialize())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1));
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptySet());
        EasyMock.expect(Boolean.valueOf(this.t2.initialize())).andReturn(true);
        Set singleton = Collections.singleton(this.tp2);
        EasyMock.expect(this.t2.partitions()).andReturn(singleton);
        EasyMock.expect(this.t2.changelogPartitions()).andReturn(Collections.emptyList());
        EasyMock.expect(Boolean.valueOf(this.t2.hasStateStores())).andReturn(true);
        EasyMock.replay(new Object[]{this.t1, this.t2});
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.addNewTask(this.t2);
        Set initializeNewTasks = this.assignedTasks.initializeNewTasks();
        Collection restoringTasks = this.assignedTasks.restoringTasks();
        MatcherAssert.assertThat(Integer.valueOf(restoringTasks.size()), IsEqual.equalTo(1));
        Assert.assertSame(restoringTasks.iterator().next(), this.t1);
        MatcherAssert.assertThat(initializeNewTasks, IsEqual.equalTo(singleton));
    }

    @Test
    public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
        EasyMock.expect(Boolean.valueOf(this.t2.initialize())).andReturn(true);
        EasyMock.expect(this.t2.partitions()).andReturn(Collections.singleton(this.tp2));
        EasyMock.expect(this.t2.changelogPartitions()).andReturn(Collections.emptyList());
        EasyMock.expect(Boolean.valueOf(this.t2.hasStateStores())).andReturn(false);
        EasyMock.replay(new Object[]{this.t2});
        this.assignedTasks.addNewTask(this.t2);
        Set initializeNewTasks = this.assignedTasks.initializeNewTasks();
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId2)));
        MatcherAssert.assertThat(initializeNewTasks, IsEqual.equalTo(Collections.emptySet()));
    }

    @Test
    public void shouldTransitionFullyRestoredTasksToRunning() {
        Set mkSet = Utils.mkSet(new TopicPartition[]{this.tp1});
        EasyMock.expect(Boolean.valueOf(this.t1.initialize())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(mkSet).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Utils.mkSet(new TopicPartition[]{this.changeLog1, this.changeLog2})).anyTimes();
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(true).anyTimes();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        Assert.assertTrue(this.assignedTasks.updateRestored(Utils.mkSet(new TopicPartition[]{this.changeLog1})).isEmpty());
        MatcherAssert.assertThat(this.assignedTasks.updateRestored(Utils.mkSet(new TopicPartition[]{this.changeLog2})), IsEqual.equalTo(mkSet));
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
    }

    @Test
    public void shouldSuspendRunningTasks() {
        mockRunningTaskSuspension();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        MatcherAssert.assertThat(this.assignedTasks.previousTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseRestoringTasks() {
        EasyMock.expect(Boolean.valueOf(this.t1.initialize())).andReturn(false);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1));
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptySet());
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldClosedUnInitializedTasksOnSuspend() {
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        this.assignedTasks.addNewTask(this.t1);
        MatcherAssert.assertThat(this.assignedTasks.suspend(), CoreMatchers.nullValue());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldNotSuspendSuspendedTasks() {
        mockRunningTaskSuspension();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        MatcherAssert.assertThat(this.assignedTasks.suspend(), CoreMatchers.nullValue());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnSuspendWhenRuntimeException() {
        mockTaskInitialization();
        this.t1.suspend();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!"));
        this.t1.close(false, false);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.not(CoreMatchers.nullValue()));
        MatcherAssert.assertThat(this.assignedTasks.previousTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnSuspendIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.suspend();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        Assert.assertTrue(this.assignedTasks.previousTaskIds().isEmpty());
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldResumeMatchingSuspendedTasks() {
        mockRunningTaskSuspension();
        this.t1.resume();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        Assert.assertTrue(this.assignedTasks.maybeResumeSuspendedTask(this.taskId1, Collections.singleton(this.tp1)));
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnResumeIfTaskMigratedException() {
        mockRunningTaskSuspension();
        this.t1.resume();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        MatcherAssert.assertThat(suspendTask(), CoreMatchers.nullValue());
        try {
            this.assignedTasks.maybeResumeSuspendedTask(this.taskId1, Collections.singleton(this.tp1));
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.EMPTY_SET));
        EasyMock.verify(new Object[]{this.t1});
    }

    private void mockTaskInitialization() {
        EasyMock.expect(Boolean.valueOf(this.t1.initialize())).andReturn(true);
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1));
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptyList());
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(false);
    }

    @Test
    public void shouldCommitRunningTasks() {
        mockTaskInitialization();
        this.t1.commit();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        this.assignedTasks.commit();
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnCommitIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.commit();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.commit();
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.EMPTY_SET));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldThrowExceptionOnCommitWhenNotCommitFailedOrProducerFenced() {
        mockTaskInitialization();
        this.t1.commit();
        EasyMock.expectLastCall().andThrow(new RuntimeException(""));
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.commit();
            Assert.fail("Should have thrown exception");
        } catch (Exception e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.singleton(this.taskId1)));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCommitRunningTasksIfNeeded() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.maybeCommit()), IsEqual.equalTo(1));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnMaybeCommitIfTaskMigratedException() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.commitNeeded())).andReturn(true);
        this.t1.commit();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.maybeCommit();
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.EMPTY_SET));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnProcessesIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.process();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.process();
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.EMPTY_SET));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldPunctuateRunningTasks() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateStreamTime())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateSystemTime())).andReturn(true);
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.punctuate()), IsEqual.equalTo(2));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnMaybePunctuateStreamTimeIfTaskMigratedException() {
        mockTaskInitialization();
        this.t1.maybePunctuateStreamTime();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.punctuate();
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
        }
        MatcherAssert.assertThat(this.assignedTasks.runningTaskIds(), IsEqual.equalTo(Collections.EMPTY_SET));
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldCloseTaskOnMaybePunctuateSystemTimeIfTaskMigratedException() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateStreamTime())).andReturn(true);
        this.t1.maybePunctuateSystemTime();
        EasyMock.expectLastCall().andThrow(new TaskMigratedException(this.t1));
        this.t1.close(false, true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        try {
            this.assignedTasks.punctuate();
            Assert.fail("Should have thrown TaskMigratedException.");
        } catch (TaskMigratedException e) {
        }
        EasyMock.verify(new Object[]{this.t1});
    }

    @Test
    public void shouldReturnNumberOfPunctuations() {
        mockTaskInitialization();
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateStreamTime())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.maybePunctuateSystemTime())).andReturn(false);
        EasyMock.replay(new Object[]{this.t1});
        addAndInitTask();
        MatcherAssert.assertThat(Integer.valueOf(this.assignedTasks.punctuate()), IsEqual.equalTo(1));
        EasyMock.verify(new Object[]{this.t1});
    }

    private void addAndInitTask() {
        this.assignedTasks.addNewTask(this.t1);
        this.assignedTasks.initializeNewTasks();
    }

    private RuntimeException suspendTask() {
        addAndInitTask();
        return this.assignedTasks.suspend();
    }

    private void mockRunningTaskSuspension() {
        EasyMock.expect(Boolean.valueOf(this.t1.initialize())).andReturn(true);
        EasyMock.expect(Boolean.valueOf(this.t1.hasStateStores())).andReturn(false).anyTimes();
        EasyMock.expect(this.t1.partitions()).andReturn(Collections.singleton(this.tp1)).anyTimes();
        EasyMock.expect(this.t1.changelogPartitions()).andReturn(Collections.emptyList()).anyTimes();
        this.t1.suspend();
        EasyMock.expectLastCall();
    }
}
