package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.LoggerFactory;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsRebalanceListenerTest.class */
public class StreamsRebalanceListenerTest {

    @Mock
    private TaskManager taskManager;

    @Mock
    private StreamThread streamThread;
    private final AtomicInteger assignmentErrorCode = new AtomicInteger();
    private final MockTime time = new MockTime();
    private StreamsRebalanceListener streamsRebalanceListener;

    @Before
    public void setup() {
        this.streamsRebalanceListener = new StreamsRebalanceListener(this.time, this.taskManager, this.streamThread, LoggerFactory.getLogger(StreamsRebalanceListenerTest.class), this.assignmentErrorCode);
    }

    @Test
    public void shouldThrowMissingSourceTopicException() {
        this.assignmentErrorCode.set(AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code());
        MatcherAssert.assertThat(Assert.assertThrows(MissingSourceTopicException.class, () -> {
            this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        }).getMessage(), CoreMatchers.is("One or more source topics were missing during rebalance"));
        ((TaskManager) Mockito.verify(this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldSwallowVersionProbingError() {
        this.assignmentErrorCode.set(AssignorError.VERSION_PROBING.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        ((StreamThread) Mockito.verify(this.streamThread)).setState(StreamThread.State.PARTITIONS_ASSIGNED);
        ((StreamThread) Mockito.verify(this.streamThread)).setPartitionAssignedTime(this.time.milliseconds());
        ((TaskManager) Mockito.verify(this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldSendShutdown() {
        this.assignmentErrorCode.set(AssignorError.SHUTDOWN_REQUESTED.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        ((TaskManager) Mockito.verify(this.taskManager)).handleRebalanceComplete();
        ((StreamThread) Mockito.verify(this.streamThread)).shutdownToError();
    }

    @Test
    public void shouldThrowTaskAssignmentException() {
        this.assignmentErrorCode.set(AssignorError.ASSIGNMENT_ERROR.code());
        MatcherAssert.assertThat(Assert.assertThrows(TaskAssignmentException.class, () -> {
            this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        }).getMessage(), CoreMatchers.is("Hit an unexpected exception during task assignment phase of rebalance"));
        ((TaskManager) Mockito.verify(this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldThrowTaskAssignmentExceptionOnUnrecognizedErrorCode() {
        this.assignmentErrorCode.set(Integer.MAX_VALUE);
        MatcherAssert.assertThat(Assert.assertThrows(TaskAssignmentException.class, () -> {
            this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        }).getMessage(), CoreMatchers.is("Hit an unrecognized exception during rebalance"));
    }

    @Test
    public void shouldHandleAssignedPartitions() {
        this.assignmentErrorCode.set(AssignorError.NONE.code());
        this.streamsRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        ((StreamThread) Mockito.verify(this.streamThread)).setState(StreamThread.State.PARTITIONS_ASSIGNED);
        ((StreamThread) Mockito.verify(this.streamThread)).setPartitionAssignedTime(this.time.milliseconds());
        ((TaskManager) Mockito.verify(this.taskManager)).handleRebalanceComplete();
    }

    @Test
    public void shouldHandleRevokedPartitions() {
        List singletonList = Collections.singletonList(new TopicPartition(AssignmentTestUtils.TOPIC_PREFIX, 0));
        Mockito.when(this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).thenReturn(StreamThread.State.RUNNING);
        this.streamsRebalanceListener.onPartitionsRevoked(singletonList);
        ((TaskManager) Mockito.verify(this.taskManager)).handleRevocation(singletonList);
    }

    @Test
    public void shouldNotHandleRevokedPartitionsIfStateCannotTransitToPartitionRevoked() {
        Mockito.when(this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).thenReturn((Object) null);
        this.streamsRebalanceListener.onPartitionsRevoked(Collections.singletonList(new TopicPartition(AssignmentTestUtils.TOPIC_PREFIX, 0)));
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.never())).handleRevocation((Collection) ArgumentMatchers.any());
    }

    @Test
    public void shouldNotHandleEmptySetOfRevokedPartitions() {
        Mockito.when(this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED)).thenReturn(StreamThread.State.RUNNING);
        this.streamsRebalanceListener.onPartitionsRevoked(Collections.emptyList());
        ((TaskManager) Mockito.verify(this.taskManager, Mockito.never())).handleRevocation((Collection) ArgumentMatchers.any());
    }

    @Test
    public void shouldHandleLostPartitions() {
        this.streamsRebalanceListener.onPartitionsLost(Collections.singletonList(new TopicPartition(AssignmentTestUtils.TOPIC_PREFIX, 0)));
        ((TaskManager) Mockito.verify(this.taskManager)).handleLostAll();
    }
}
