/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.command;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.util.Preconditions;

class TestCommandDispatcherImpl
implements TestCommandDispatcher {
    private final Map<String, List<TestCommandDispatcher.CommandExecutor>> subscribers = new ConcurrentHashMap<String, List<TestCommandDispatcher.CommandExecutor>>();

    TestCommandDispatcherImpl() {
    }

    @Override
    public void subscribe(TestCommandDispatcher.CommandExecutor executor, String operatorID) {
        this.subscribers.computeIfAbsent(operatorID, ign -> new CopyOnWriteArrayList()).add(executor);
    }

    @Override
    public void dispatch(TestCommand testCommand, TestCommandDispatcher.TestCommandScope scope, String operatorID) {
        this.executeInternal(testCommand, scope, this.subscribers.getOrDefault(operatorID, Collections.emptyList()));
    }

    @Override
    public void broadcast(TestCommand testCommand, TestCommandDispatcher.TestCommandScope scope) {
        this.executeInternal(testCommand, scope, this.subscribers.values().stream().flatMap(Collection::stream).collect(Collectors.toList()));
    }

    @Override
    public void unsubscribe(String operatorID, TestCommandDispatcher.CommandExecutor commandExecutor) {
        this.subscribers.getOrDefault(operatorID, Collections.emptyList()).remove(commandExecutor);
    }

    private void executeInternal(TestCommand command, TestCommandDispatcher.TestCommandScope scope, List<TestCommandDispatcher.CommandExecutor> executors) {
        Preconditions.checkState((!executors.isEmpty() ? 1 : 0) != 0, (Object)("no executors for command: " + command));
        HashSet<TestCommandDispatcher.CommandExecutor> toRemove = new HashSet<TestCommandDispatcher.CommandExecutor>();
        for (TestCommandDispatcher.CommandExecutor executor : executors) {
            executor.execute(command);
            if (command.isTerminal()) {
                toRemove.add(executor);
            }
            if (scope != TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK) continue;
            break;
        }
        executors.removeAll(toRemove);
    }
}

