package org.apache.kafka.streams.processor.internals.tasks;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ReadOnlyTask;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.TaskExecutionMetadata;
import org.apache.kafka.streams.processor.internals.TasksRegistry;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.class */
public final class DefaultTaskManager implements TaskManager {
    private final Time time;
    private final Logger log;
    private final TasksRegistry tasks;
    private final Lock tasksLock = new ReentrantLock();
    private final Condition tasksCondition = this.tasksLock.newCondition();
    private final List<TaskId> lockedTasks = new ArrayList();
    private final Map<TaskId, StreamsException> uncaughtExceptions = new HashMap();
    private final Map<TaskId, TaskExecutor> assignedTasks = new HashMap();
    private final TaskExecutionMetadata taskExecutionMetadata;
    private final List<TaskExecutor> taskExecutors;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager$DefaultTaskExecutorCreator.class */
    public static class DefaultTaskExecutorCreator implements TaskExecutorCreator {
        @Override // org.apache.kafka.streams.processor.internals.tasks.TaskExecutorCreator
        public TaskExecutor create(TaskManager taskManager, String str, Time time, TaskExecutionMetadata taskExecutionMetadata) {
            return new DefaultTaskExecutor(taskManager, str, time, taskExecutionMetadata);
        }
    }

    public DefaultTaskManager(Time time, String str, TasksRegistry tasksRegistry, TaskExecutorCreator taskExecutorCreator, TaskExecutionMetadata taskExecutionMetadata, int i) {
        this.log = new LogContext(String.format("%s ", str)).logger(DefaultTaskManager.class);
        this.time = time;
        this.tasks = tasksRegistry;
        this.taskExecutionMetadata = taskExecutionMetadata;
        this.taskExecutors = new ArrayList(i);
        for (int i2 = 1; i2 <= i; i2++) {
            this.taskExecutors.add(taskExecutorCreator.create(this, str + "-TaskExecutor-" + i2, time, taskExecutionMetadata));
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public StreamTask assignNextTask(TaskExecutor taskExecutor) {
        return (StreamTask) returnWithTasksLocked(() -> {
            if (!this.taskExecutors.contains(taskExecutor)) {
                throw new IllegalArgumentException("The requested executor for getting next task to assign is unrecognized");
            }
            for (Task task : this.tasks.activeTasks()) {
                if (!this.assignedTasks.containsKey(task.id()) && !this.lockedTasks.contains(task.id()) && canProgress((StreamTask) task, this.time.milliseconds()) && !hasUncaughtException(task.id())) {
                    this.assignedTasks.put(task.id(), taskExecutor);
                    this.log.debug("Assigned task {} to executor {}", task.id(), taskExecutor.name());
                    return (StreamTask) task;
                }
            }
            this.log.debug("Found no assignable task for executor {}", taskExecutor.name());
            return null;
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void awaitProcessableTasks(Supplier<Boolean> supplier) throws InterruptedException {
        if (((Boolean) returnWithTasksLocked(() -> {
            for (Task task : this.tasks.activeTasks()) {
                if (!this.assignedTasks.containsKey(task.id()) && !this.lockedTasks.contains(task.id()) && canProgress((StreamTask) task, this.time.milliseconds()) && !hasUncaughtException(task.id())) {
                    this.log.debug("Await unblocked: returning early from await since a processable task {} was found", task.id());
                    return false;
                }
            }
            try {
                if (((Boolean) supplier.get()).booleanValue()) {
                    this.log.debug("Not awaiting since shutdown was requested");
                } else {
                    this.log.debug("Await blocking");
                    this.tasksCondition.await();
                }
                this.log.debug("Await unblocked: Woken up to check for processable tasks");
                return false;
            } catch (InterruptedException e) {
                this.log.debug("Await unblocked: Interrupted while waiting for processable tasks");
                return true;
            }
        })).booleanValue()) {
            throw new InterruptedException();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void signalTaskExecutors() {
        this.log.debug("Waking up task executors");
        Condition condition = this.tasksCondition;
        Objects.requireNonNull(condition);
        executeWithTasksLocked(condition::signalAll);
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void unassignTask(StreamTask streamTask, TaskExecutor taskExecutor) {
        executeWithTasksLocked(() -> {
            if (!this.taskExecutors.contains(taskExecutor)) {
                throw new IllegalArgumentException("The requested executor for unassign task is unrecognized");
            }
            TaskExecutor taskExecutor2 = this.assignedTasks.get(streamTask.id());
            if (taskExecutor2 == null || taskExecutor2 != taskExecutor) {
                throw new IllegalArgumentException("Task " + String.valueOf(streamTask.id()) + " is not locked by the executor");
            }
            this.assignedTasks.remove(streamTask.id());
            this.log.debug("Unassigned {} from executor {}", streamTask.id(), taskExecutor.name());
            this.tasksCondition.signalAll();
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public KafkaFuture<Void> lockTasks(Set<TaskId> set) {
        KafkaFutureImpl kafkaFutureImpl = new KafkaFutureImpl();
        if (!set.isEmpty()) {
            return (KafkaFuture) returnWithTasksLocked(() -> {
                this.lockedTasks.addAll(set);
                ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet(set);
                Iterator it = set.iterator();
                while (it.hasNext()) {
                    TaskId taskId = (TaskId) it.next();
                    Task task = this.tasks.task(taskId);
                    if (task == null) {
                        throw new IllegalArgumentException("Trying to lock task " + String.valueOf(taskId) + " but it's not owned");
                    }
                    if (!task.isActive()) {
                        throw new IllegalArgumentException("The locking task " + String.valueOf(taskId) + " is not an active task");
                    }
                    if (this.assignedTasks.containsKey(taskId)) {
                        TaskExecutor taskExecutor = this.assignedTasks.get(taskId);
                        this.log.debug("Requesting release of task {} from {}", taskId, taskExecutor.name());
                        taskExecutor.unassign().whenComplete((streamTask, th) -> {
                            if (th != null) {
                                kafkaFutureImpl.completeExceptionally(th);
                                return;
                            }
                            if (!$assertionsDisabled && this.assignedTasks.containsKey(taskId)) {
                                throw new AssertionError();
                            }
                            if (!$assertionsDisabled && streamTask != null && streamTask.id() != taskId) {
                                throw new AssertionError();
                            }
                            concurrentSkipListSet.remove(taskId);
                            if (concurrentSkipListSet.isEmpty()) {
                                kafkaFutureImpl.complete((Object) null);
                            }
                        });
                    } else {
                        concurrentSkipListSet.remove(taskId);
                        if (concurrentSkipListSet.isEmpty()) {
                            kafkaFutureImpl.complete((Object) null);
                        }
                    }
                }
                return kafkaFutureImpl;
            });
        }
        kafkaFutureImpl.complete((Object) null);
        return kafkaFutureImpl;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public KafkaFuture<Void> lockAllTasks() {
        return (KafkaFuture) returnWithTasksLocked(() -> {
            return lockTasks((Set) this.tasks.activeTasks().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet()));
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void unlockTasks(Set<TaskId> set) {
        if (set.isEmpty()) {
            return;
        }
        executeWithTasksLocked(() -> {
            this.lockedTasks.removeAll(set);
            this.log.debug("Waking up task executors");
            this.tasksCondition.signalAll();
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void unlockAllTasks() {
        executeWithTasksLocked(() -> {
            unlockTasks((Set) this.tasks.activeTasks().stream().map((v0) -> {
                return v0.id();
            }).collect(Collectors.toSet()));
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void add(Set<StreamTask> set) {
        executeWithTasksLocked(() -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                this.tasks.addTask((StreamTask) it.next());
            }
            this.log.debug("Waking up task executors");
            this.tasksCondition.signalAll();
        });
        this.log.info("Added tasks {} to the task manager to process", set);
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void remove(TaskId taskId) {
        executeWithTasksLocked(() -> {
            if (this.assignedTasks.containsKey(taskId)) {
                throw new IllegalArgumentException("The task to remove is still assigned to executors");
            }
            if (!this.lockedTasks.contains(taskId)) {
                throw new IllegalArgumentException("The task to remove is not locked yet by the task manager");
            }
            if (!this.tasks.contains(taskId)) {
                throw new IllegalArgumentException("The task to remove is not owned by the task manager");
            }
            this.tasks.removeTask(this.tasks.task(taskId));
        });
        this.log.info("Removed task {} from the task manager", taskId);
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public Set<ReadOnlyTask> getTasks() {
        return (Set) returnWithTasksLocked(() -> {
            return (Set) this.tasks.activeTasks().stream().map(ReadOnlyTask::new).collect(Collectors.toSet());
        });
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void setUncaughtException(StreamsException streamsException, TaskId taskId) {
        executeWithTasksLocked(() -> {
            if (!this.assignedTasks.containsKey(taskId)) {
                throw new IllegalArgumentException("An uncaught exception can only be set as long as the task is still assigned");
            }
            if (this.uncaughtExceptions.containsKey(taskId)) {
                throw new IllegalArgumentException("The uncaught exception must be cleared before restarting processing");
            }
            this.uncaughtExceptions.put(taskId, streamsException);
        });
        this.log.info("Set an uncaught exception of type {} for task {}, with error message: {}", new Object[]{streamsException.getClass().getName(), taskId, streamsException.getMessage()});
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public Map<TaskId, RuntimeException> drainUncaughtExceptions() {
        Map<TaskId, RuntimeException> map = (Map) returnWithTasksLocked(() -> {
            HashMap hashMap = new HashMap(this.uncaughtExceptions);
            this.uncaughtExceptions.clear();
            return hashMap;
        });
        if (!map.isEmpty()) {
            this.log.debug("Drained {} uncaught exceptions", Integer.valueOf(map.size()));
        }
        return map;
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public boolean hasUncaughtException(TaskId taskId) {
        return ((Boolean) returnWithTasksLocked(() -> {
            return Boolean.valueOf(this.uncaughtExceptions.containsKey(taskId));
        })).booleanValue();
    }

    private void executeWithTasksLocked(Runnable runnable) {
        this.tasksLock.lock();
        try {
            runnable.run();
        } finally {
            this.tasksLock.unlock();
        }
    }

    private <T> T returnWithTasksLocked(Supplier<T> supplier) {
        this.tasksLock.lock();
        try {
            return supplier.get();
        } finally {
            this.tasksLock.unlock();
        }
    }

    private boolean canProgress(StreamTask streamTask, long j) {
        return (this.taskExecutionMetadata.canProcessTask(streamTask, j) && streamTask.isProcessable(j)) || (this.taskExecutionMetadata.canPunctuateTask(streamTask) && (streamTask.canPunctuateStreamTime() || streamTask.canPunctuateSystemTime()));
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void startTaskExecutors() {
        Iterator<TaskExecutor> it = this.taskExecutors.iterator();
        while (it.hasNext()) {
            it.next().start();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.tasks.TaskManager
    public void shutdown(Duration duration) {
        Iterator<TaskExecutor> it = this.taskExecutors.iterator();
        while (it.hasNext()) {
            it.next().requestShutdown();
        }
        signalTaskExecutors();
        Iterator<TaskExecutor> it2 = this.taskExecutors.iterator();
        while (it2.hasNext()) {
            it2.next().awaitShutdown(duration);
        }
    }

    static {
        $assertionsDisabled = !DefaultTaskManager.class.desiredAssertionStatus();
    }
}
