package org.apache.kafka.streams.processor.internals;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsRebalanceListener.class */
public class StreamsRebalanceListener implements ConsumerRebalanceListener {
    private final Time time;
    private final TaskManager taskManager;
    private final StreamThread streamThread;
    private final Logger log;
    private final AtomicInteger assignmentErrorCode;

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamsRebalanceListener(Time time, TaskManager taskManager, StreamThread streamThread, Logger logger, AtomicInteger atomicInteger) {
        this.time = time;
        this.taskManager = taskManager;
        this.streamThread = streamThread;
        this.log = logger;
        this.assignmentErrorCode = atomicInteger;
    }

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (this.assignmentErrorCode.get() == AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()) {
            this.log.error("Received error code {}. {}", AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.codeName(), AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.description());
            this.taskManager.handleRebalanceComplete();
            throw new MissingSourceTopicException("One or more source topics were missing during rebalance");
        }
        if (this.assignmentErrorCode.get() == AssignorError.VERSION_PROBING.code()) {
            this.log.info("Received version probing code {}", AssignorError.VERSION_PROBING);
        } else {
            if (this.assignmentErrorCode.get() == AssignorError.ASSIGNMENT_ERROR.code()) {
                this.log.error("Received error code {}", AssignorError.ASSIGNMENT_ERROR);
                this.taskManager.handleRebalanceComplete();
                throw new TaskAssignmentException("Hit an unexpected exception during task assignment phase of rebalance");
            }
            if (this.assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
                this.log.error("A Kafka Streams client in this Kafka Streams application is requesting to shutdown the application");
                this.taskManager.handleRebalanceComplete();
                this.streamThread.shutdownToError();
                return;
            } else if (this.assignmentErrorCode.get() != AssignorError.NONE.code()) {
                this.log.error("Received unknown error code {}", Integer.valueOf(this.assignmentErrorCode.get()));
                throw new TaskAssignmentException("Hit an unrecognized exception during rebalance");
            }
        }
        this.streamThread.setState(StreamThread.State.PARTITIONS_ASSIGNED);
        this.streamThread.setPartitionAssignedTime(this.time.milliseconds());
        this.taskManager.handleRebalanceComplete();
    }

    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        this.log.debug("Current state {}: revoked partitions {} because of consumer rebalance.\n\tcurrently assigned active tasks: {}\n\tcurrently assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), collection, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
        if ((this.streamThread.setState(StreamThread.State.PARTITIONS_REVOKED) != null || this.streamThread.state() == StreamThread.State.PENDING_SHUTDOWN) && !collection.isEmpty()) {
            long milliseconds = this.time.milliseconds();
            try {
                this.taskManager.handleRevocation(collection);
                this.log.info("partition revocation took {} ms.", Long.valueOf(this.time.milliseconds() - milliseconds));
            } catch (Throwable th) {
                this.log.info("partition revocation took {} ms.", Long.valueOf(this.time.milliseconds() - milliseconds));
                throw th;
            }
        }
    }

    public void onPartitionsLost(Collection<TopicPartition> collection) {
        this.log.info("at state {}: partitions {} lost due to missed rebalance.\n\tlost active tasks: {}\n\tlost assigned standby tasks: {}\n", new Object[]{this.streamThread.state(), collection, this.taskManager.activeTaskIds(), this.taskManager.standbyTaskIds()});
        long milliseconds = this.time.milliseconds();
        try {
            this.taskManager.handleLostAll();
            this.log.info("partitions lost took {} ms.", Long.valueOf(this.time.milliseconds() - milliseconds));
        } catch (Throwable th) {
            this.log.info("partitions lost took {} ms.", Long.valueOf(this.time.milliseconds() - milliseconds));
            throw th;
        }
    }
}
