package org.apache.storm.daemon.worker;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Supplier;
import org.apache.storm.Config;
import org.apache.storm.Constants;
import org.apache.storm.StormTimer;
import org.apache.storm.cluster.IStateStorage;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.cluster.VersionedData;
import org.apache.storm.daemon.StormCommon;
import org.apache.storm.daemon.supervisor.AdvancedFSOps;
import org.apache.storm.daemon.worker.BackPressureTracker;
import org.apache.storm.executor.IRunningExecutor;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.Credentials;
import org.apache.storm.generated.DebugOptions;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.StormBase;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.generated.TopologyStatus;
import org.apache.storm.grouping.LoadMapping;
import org.apache.storm.hooks.IWorkerHook;
import org.apache.storm.messaging.ConnectionWithStatus;
import org.apache.storm.messaging.DeserializingConnectionCallback;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.IContext;
import org.apache.storm.messaging.TransportFactory;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.apache.storm.metrics2.StormMetricRegistry;
import org.apache.storm.policy.IWaitStrategy;
import org.apache.storm.security.auth.IAutoCredentials;
import org.apache.storm.serialization.ITupleSerializer;
import org.apache.storm.serialization.KryoTupleSerializer;
import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
import org.apache.storm.shade.com.google.common.collect.Sets;
import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.task.WorkerUserContext;
import org.apache.storm.tuple.AddressedTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.JCQueue;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.SupervisorIfaceFactory;
import org.apache.storm.utils.ThriftTopologyUtils;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/worker/WorkerState.class */
public class WorkerState {
    private static final long LOAD_REFRESH_INTERVAL_MS = 5000;
    private static final int RESEND_BACKPRESSURE_SIZE = 10000;
    final Map<String, Object> conf;
    final IContext mqContext;
    final IConnection receiver;
    final String topologyId;
    final String assignmentId;
    private final Supplier<SupervisorIfaceFactory> supervisorIfaceSupplier;
    final int port;
    final String workerId;
    final IStateStorage stateStorage;
    final IStormClusterState stormClusterState;
    final CountDownLatch isWorkerActive;
    final AtomicBoolean isTopologyActive;
    final AtomicReference<Map<String, DebugOptions>> stormComponentToDebug;
    final Set<List<Long>> localExecutors;
    final ArrayList<Integer> localTaskIds;
    final Map<String, Object> topologyConf;
    final StormTopology topology;
    final StormTopology systemTopology;
    final Map<Integer, String> taskToComponent;
    final Map<String, Map<String, Fields>> componentToStreamToFields;
    final Map<String, List<Integer>> componentToSortedTasks;
    final ConcurrentMap<String, Long> blobToLastKnownVersion;
    final ReentrantReadWriteLock endpointSocketLock;
    final AtomicReference<Map<Integer, NodeInfo>> cachedTaskToNodePort;
    final AtomicReference<Map<String, String>> cachedNodeToHost;
    final AtomicReference<Map<NodeInfo, IConnection>> cachedNodeToPortSocket;
    final Map<List<Long>, JCQueue> executorReceiveQueueMap;
    final Map<Integer, JCQueue> taskToExecutorQueue;
    final Runnable suicideCallback;
    final Utils.UptimeComputer uptime;
    final Map<String, Object> defaultSharedResources;
    final Map<String, Object> userSharedResources;
    final LoadMapping loadMapping;
    final AtomicReference<Map<String, VersionedData<Assignment>>> assignmentVersions;
    private final WorkerTransfer workerTransfer;
    private final BackPressureTracker bpTracker;
    private final List<IWorkerHook> deserializedWorkerHooks;
    private final Set<Integer> outboundTasks;
    private final boolean trySerializeLocal;
    private final Collection<IAutoCredentials> autoCredentials;
    private final AtomicReference<Credentials> credentialsAtom;
    private final StormMetricRegistry metricRegistry;
    private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class);
    private static long dropCount = 0;
    final Map<Integer, JCQueue> localReceiveQueues = new HashMap();
    final StormTimer heartbeatTimer = mkHaltingTimer("heartbeat-timer");
    final StormTimer refreshLoadTimer = mkHaltingTimer("refresh-load-timer");
    final StormTimer refreshConnectionsTimer = mkHaltingTimer("refresh-connections-timer");
    final StormTimer refreshCredentialsTimer = mkHaltingTimer("refresh-credentials-timer");
    final StormTimer checkForUpdatedBlobsTimer = mkHaltingTimer("check-for-updated-blobs-timer");
    final StormTimer resetLogLevelsTimer = mkHaltingTimer("reset-log-levels-timer");
    final StormTimer refreshActiveTimer = mkHaltingTimer("refresh-active-timer");
    final StormTimer executorHeartbeatTimer = mkHaltingTimer("executor-heartbeat-timer");
    final StormTimer flushTupleTimer = mkHaltingTimer("flush-tuple-timer");
    final StormTimer userTimer = mkHaltingTimer(Constants.USER_TIMER);
    final StormTimer backPressureCheckTimer = mkHaltingTimer("backpressure-check-timer");
    private final AtomicLong nextLoadUpdate = new AtomicLong(0);

    /* loaded from: input_file:org/apache/storm/daemon/worker/WorkerState$ILocalTransferCallback.class */
    public interface ILocalTransferCallback {
        void transfer(ArrayList<AddressedTuple> arrayList);
    }

    public WorkerState(Map<String, Object> map, IContext iContext, String str, String str2, Supplier<SupervisorIfaceFactory> supplier, int i, String str3, Map<String, Object> map2, IStateStorage iStateStorage, IStormClusterState iStormClusterState, Collection<IAutoCredentials> collection, StormMetricRegistry stormMetricRegistry, Credentials credentials) throws IOException, InvalidTopologyException {
        this.metricRegistry = stormMetricRegistry;
        this.autoCredentials = collection;
        this.credentialsAtom = new AtomicReference<>(credentials);
        this.conf = map;
        this.supervisorIfaceSupplier = supplier;
        this.mqContext = null != iContext ? iContext : TransportFactory.makeContext(map2, stormMetricRegistry);
        this.topologyId = str;
        this.assignmentId = str2;
        this.port = i;
        this.workerId = str3;
        this.stateStorage = iStateStorage;
        this.stormClusterState = iStormClusterState;
        this.localExecutors = new HashSet(readWorkerExecutors(str2, i, getLocalAssignment(this.stormClusterState, str)));
        this.isWorkerActive = new CountDownLatch(1);
        this.isTopologyActive = new AtomicBoolean(false);
        this.stormComponentToDebug = new AtomicReference<>();
        this.topology = ConfigUtils.readSupervisorTopology(map, str, AdvancedFSOps.make(map));
        this.taskToComponent = StormCommon.stormTaskInfo(this.topology, map2);
        this.executorReceiveQueueMap = mkReceiveQueueMap(map2, this.localExecutors, this.taskToComponent);
        this.localTaskIds = new ArrayList<>();
        this.taskToExecutorQueue = new HashMap();
        this.blobToLastKnownVersion = new ConcurrentHashMap();
        for (Map.Entry<List<Long>, JCQueue> entry : this.executorReceiveQueueMap.entrySet()) {
            List<Integer> executorIdToTasks = StormCommon.executorIdToTasks(entry.getKey());
            Iterator<Integer> it = executorIdToTasks.iterator();
            while (it.hasNext()) {
                this.taskToExecutorQueue.put(it.next(), entry.getValue());
            }
            this.localTaskIds.addAll(executorIdToTasks);
        }
        Collections.sort(this.localTaskIds);
        this.topologyConf = map2;
        this.systemTopology = StormCommon.systemTopology(map2, this.topology);
        this.componentToStreamToFields = new HashMap();
        for (String str4 : ThriftTopologyUtils.getComponentIds(this.systemTopology)) {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, StreamInfo> entry2 : ThriftTopologyUtils.getComponentCommon(this.systemTopology, str4).get_streams().entrySet()) {
                hashMap.put(entry2.getKey(), new Fields(entry2.getValue().get_output_fields()));
            }
            this.componentToStreamToFields.put(str4, hashMap);
        }
        this.componentToSortedTasks = Utils.reverseMap(this.taskToComponent);
        this.componentToSortedTasks.values().forEach(Collections::sort);
        this.endpointSocketLock = new ReentrantReadWriteLock();
        this.cachedNodeToPortSocket = new AtomicReference<>(new HashMap());
        this.cachedTaskToNodePort = new AtomicReference<>(new HashMap());
        this.cachedNodeToHost = new AtomicReference<>(new HashMap());
        this.suicideCallback = Utils.mkSuicideFn();
        this.uptime = Utils.makeUptimeComputer();
        this.defaultSharedResources = makeDefaultResources();
        this.userSharedResources = makeUserResources();
        this.loadMapping = new LoadMapping();
        this.assignmentVersions = new AtomicReference<>(new HashMap());
        this.outboundTasks = workerOutboundTasks();
        this.trySerializeLocal = map2.containsKey(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE) && ((Boolean) map2.get(Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE)).booleanValue();
        if (this.trySerializeLocal) {
            LOG.warn("WILL TRY TO SERIALIZE ALL TUPLES (Turn off {} for production", Config.TOPOLOGY_TESTING_ALWAYS_TRY_SERIALIZE);
        }
        this.workerTransfer = new WorkerTransfer(this, map2, getMaxTaskId(this.componentToSortedTasks));
        this.bpTracker = new BackPressureTracker(str3, this.taskToExecutorQueue, stormMetricRegistry, this.taskToComponent);
        this.deserializedWorkerHooks = deserializeWorkerHooks();
        LOG.info("Registering IConnectionCallbacks for {}:{}", str2, Integer.valueOf(i));
        this.receiver = this.mqContext.bind(str, i, new DeserializingConnectionCallback(map2, getWorkerTopologyContext(), this::transferLocalBatch), () -> {
            BackPressureStatus currStatus = this.bpTracker.getCurrStatus();
            LOG.info("Sending BackPressure status to new client. BPStatus: {}", currStatus);
            return currStatus;
        });
    }

    public static boolean isConnectionReady(IConnection iConnection) {
        return !(iConnection instanceof ConnectionWithStatus) || ((ConnectionWithStatus) iConnection).status() == ConnectionWithStatus.Status.Ready;
    }

    private static int getMaxTaskId(Map<String, List<Integer>> map) {
        int intValue;
        int i = -1;
        for (List<Integer> list : map.values()) {
            if (!list.isEmpty() && (intValue = list.stream().max((v0, v1) -> {
                return v0.compareTo(v1);
            }).get().intValue()) > i) {
                i = intValue;
            }
        }
        return i;
    }

    public List<IWorkerHook> getDeserializedWorkerHooks() {
        return this.deserializedWorkerHooks;
    }

    public Map<String, Object> getConf() {
        return this.conf;
    }

    public IConnection getReceiver() {
        return this.receiver;
    }

    public String getTopologyId() {
        return this.topologyId;
    }

    public int getPort() {
        return this.port;
    }

    public String getWorkerId() {
        return this.workerId;
    }

    public IStateStorage getStateStorage() {
        return this.stateStorage;
    }

    public CountDownLatch getIsWorkerActive() {
        return this.isWorkerActive;
    }

    public AtomicBoolean getIsTopologyActive() {
        return this.isTopologyActive;
    }

    public AtomicReference<Map<String, DebugOptions>> getStormComponentToDebug() {
        return this.stormComponentToDebug;
    }

    public Set<List<Long>> getLocalExecutors() {
        return this.localExecutors;
    }

    public List<Integer> getLocalTaskIds() {
        return this.localTaskIds;
    }

    public Map<Integer, JCQueue> getLocalReceiveQueues() {
        return this.localReceiveQueues;
    }

    public Map<String, Object> getTopologyConf() {
        return this.topologyConf;
    }

    public StormTopology getTopology() {
        return this.topology;
    }

    public StormTopology getSystemTopology() {
        return this.systemTopology;
    }

    public Map<Integer, String> getTaskToComponent() {
        return this.taskToComponent;
    }

    public Map<String, Map<String, Fields>> getComponentToStreamToFields() {
        return this.componentToStreamToFields;
    }

    public Map<String, List<Integer>> getComponentToSortedTasks() {
        return this.componentToSortedTasks;
    }

    public Map<String, Long> getBlobToLastKnownVersion() {
        return this.blobToLastKnownVersion;
    }

    public AtomicReference<Map<NodeInfo, IConnection>> getCachedNodeToPortSocket() {
        return this.cachedNodeToPortSocket;
    }

    public Map<List<Long>, JCQueue> getExecutorReceiveQueueMap() {
        return this.executorReceiveQueueMap;
    }

    public Runnable getSuicideCallback() {
        return this.suicideCallback;
    }

    public Utils.UptimeComputer getUptime() {
        return this.uptime;
    }

    public Map<String, Object> getDefaultSharedResources() {
        return this.defaultSharedResources;
    }

    public Map<String, Object> getUserSharedResources() {
        return this.userSharedResources;
    }

    public LoadMapping getLoadMapping() {
        return this.loadMapping;
    }

    public AtomicReference<Map<String, VersionedData<Assignment>>> getAssignmentVersions() {
        return this.assignmentVersions;
    }

    public StormTimer getUserTimer() {
        return this.userTimer;
    }

    public Utils.SmartThread makeTransferThread() {
        return this.workerTransfer.makeTransferThread();
    }

    public void suicideIfLocalAssignmentsChanged(Assignment assignment) {
        boolean z = false;
        if (assignment != null) {
            HashSet hashSet = new HashSet(readWorkerExecutors(this.assignmentId, this.port, assignment));
            if (!this.localExecutors.equals(hashSet)) {
                LOG.info("Found conflicting assignments. We shouldn't be alive! Assigned: " + hashSet + ", Current: " + this.localExecutors);
                z = true;
            }
        } else {
            LOG.info("Assigment is null. We should not be alive!");
            z = true;
        }
        if (z) {
            if (ConfigUtils.isLocalMode(this.conf)) {
                LOG.info("Local worker tried to commit suicide!");
            } else {
                this.suicideCallback.run();
            }
        }
    }

    public void refreshConnections() {
        Assignment assignment = null;
        try {
            assignment = getLocalAssignment(this.stormClusterState, this.topologyId);
        } catch (Exception e) {
            LOG.warn("Failed to read assignment. This should only happen when topology is shutting down.", e);
        }
        suicideIfLocalAssignmentsChanged(assignment);
        HashSet hashSet = new HashSet();
        HashMap hashMap = new HashMap();
        if (null != assignment) {
            for (Map.Entry<Integer, NodeInfo> entry : StormCommon.taskToNodeport(assignment.get_executor_node_port()).entrySet()) {
                Integer key = entry.getKey();
                if (this.outboundTasks.contains(key)) {
                    hashMap.put(key, entry.getValue());
                    if (!this.localTaskIds.contains(key)) {
                        hashSet.add(entry.getValue());
                    }
                }
            }
        }
        Set<NodeInfo> keySet = this.cachedNodeToPortSocket.get().keySet();
        Sets.SetView difference = Sets.difference(hashSet, keySet);
        Sets.SetView difference2 = Sets.difference(keySet, hashSet);
        Map<String, String> map = assignment != null ? assignment.get_node_host() : null;
        this.cachedNodeToPortSocket.getAndUpdate(map2 -> {
            HashMap hashMap2 = new HashMap(map2);
            Iterator it = difference.iterator();
            while (it.hasNext()) {
                NodeInfo nodeInfo = (NodeInfo) it.next();
                hashMap2.put(nodeInfo, this.mqContext.connect(this.topologyId, (String) map.get(nodeInfo.get_node()), nodeInfo.get_port().iterator().next().intValue(), this.workerTransfer.getRemoteBackPressureStatus()));
            }
            return hashMap2;
        });
        try {
            this.endpointSocketLock.writeLock().lock();
            this.cachedTaskToNodePort.set(hashMap);
            this.endpointSocketLock.writeLock().unlock();
            if (map != null) {
                this.cachedNodeToHost.set(map);
            } else {
                this.cachedNodeToHost.set(new HashMap());
            }
            Iterator it = difference2.iterator();
            while (it.hasNext()) {
                this.cachedNodeToPortSocket.get().get((NodeInfo) it.next()).close();
            }
            this.cachedNodeToPortSocket.getAndUpdate(map3 -> {
                HashMap hashMap2 = new HashMap(map3);
                Objects.requireNonNull(hashMap2);
                difference2.forEach((v1) -> {
                    r1.remove(v1);
                });
                return hashMap2;
            });
        } catch (Throwable th) {
            this.endpointSocketLock.writeLock().unlock();
            throw th;
        }
    }

    public void refreshStormActive() {
        refreshStormActive(() -> {
            this.refreshActiveTimer.schedule(0, this::refreshStormActive);
        });
    }

    public void refreshStormActive(Runnable runnable) {
        StormBase stormBase = this.stormClusterState.stormBase(this.topologyId, runnable);
        this.isTopologyActive.set(null != stormBase && stormBase.get_status() == TopologyStatus.ACTIVE);
        if (null != stormBase) {
            HashMap hashMap = new HashMap(stormBase.get_component_debug());
            for (DebugOptions debugOptions : hashMap.values()) {
                if (!debugOptions.is_set_samplingpct()) {
                    debugOptions.set_samplingpct(10.0d);
                }
                if (!debugOptions.is_set_enable()) {
                    debugOptions.set_enable(false);
                }
            }
            this.stormComponentToDebug.set(hashMap);
            LOG.debug("Events debug options {}", this.stormComponentToDebug.get());
        }
    }

    public void refreshLoad(List<IRunningExecutor> list) {
        Sets.SetView difference = Sets.difference(new HashSet(this.outboundTasks), new HashSet(this.localTaskIds));
        HashMap hashMap = new HashMap();
        for (IRunningExecutor iRunningExecutor : list) {
            hashMap.put(Integer.valueOf(iRunningExecutor.getExecutorId().get(0).intValue()), Double.valueOf(iRunningExecutor.getReceiveQueue().getQueueLoad()));
        }
        HashMap hashMap2 = new HashMap();
        this.cachedNodeToPortSocket.get().values().stream().forEach(iConnection -> {
            hashMap2.putAll(iConnection.getLoad(difference));
        });
        this.loadMapping.setLocal(hashMap);
        this.loadMapping.setRemote(hashMap2);
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        if (valueOf.longValue() > this.nextLoadUpdate.get()) {
            this.receiver.sendLoadMetrics(hashMap);
            this.nextLoadUpdate.set(valueOf.longValue() + LOAD_REFRESH_INTERVAL_MS);
        }
    }

    public void refreshBackPressureStatus() {
        LOG.debug("Checking for change in Backpressure status on worker's tasks");
        if (this.bpTracker.refreshBpTaskList()) {
            this.receiver.sendBackPressureStatus(this.bpTracker.getCurrStatus());
        }
    }

    public void activateWorkerWhenAllConnectionsReady() {
        int i = 1;
        this.refreshActiveTimer.schedule(0, () -> {
            if (!areAllConnectionsReady()) {
                this.refreshActiveTimer.schedule(i, () -> {
                    activateWorkerWhenAllConnectionsReady();
                }, false, 0);
            } else {
                LOG.info("All connections are ready for worker {}:{} with id {}", new Object[]{this.assignmentId, Integer.valueOf(this.port), this.workerId});
                this.isWorkerActive.countDown();
            }
        });
    }

    public boolean tryTransferRemote(AddressedTuple addressedTuple, Queue<AddressedTuple> queue, ITupleSerializer iTupleSerializer) {
        return this.workerTransfer.tryTransferRemote(addressedTuple, queue, iTupleSerializer);
    }

    public void flushRemotes() throws InterruptedException {
        this.workerTransfer.flushRemotes();
    }

    public boolean tryFlushRemotes() {
        return this.workerTransfer.tryFlushRemotes();
    }

    private void transferLocalBatch(ArrayList<AddressedTuple> arrayList) {
        for (int i = 0; i < arrayList.size(); i++) {
            AddressedTuple addressedTuple = arrayList.get(i);
            JCQueue jCQueue = this.taskToExecutorQueue.get(Integer.valueOf(addressedTuple.dest));
            if (!jCQueue.isEmptyOverflow() || !jCQueue.tryPublish(addressedTuple)) {
                int overflowCount = jCQueue.getOverflowCount();
                BackPressureTracker.BackpressureState backpressureState = this.bpTracker.getBackpressureState(Integer.valueOf(addressedTuple.dest));
                if (this.bpTracker.recordBackPressure(backpressureState)) {
                    this.receiver.sendBackPressureStatus(this.bpTracker.getCurrStatus());
                    this.bpTracker.setLastOverflowCount(backpressureState, overflowCount);
                } else if (overflowCount - this.bpTracker.getLastOverflowCount(backpressureState) > RESEND_BACKPRESSURE_SIZE) {
                    BackPressureStatus currStatus = this.bpTracker.getCurrStatus();
                    this.receiver.sendBackPressureStatus(currStatus);
                    this.bpTracker.setLastOverflowCount(backpressureState, overflowCount);
                    LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", Integer.valueOf(overflowCount), Long.valueOf(currStatus.id));
                }
                if (!jCQueue.tryPublishToOverflow(addressedTuple)) {
                    dropMessage(addressedTuple, jCQueue);
                }
            }
        }
    }

    private void dropMessage(AddressedTuple addressedTuple, JCQueue jCQueue) {
        dropCount++;
        jCQueue.recordMsgDrop();
        LOG.warn("Dropping message as overflow threshold has reached for Q = {}. OverflowCount = {}. Total Drop Count= {}, Dropped Message : {}", new Object[]{jCQueue.getQueueName(), Integer.valueOf(jCQueue.getOverflowCount()), Long.valueOf(dropCount), addressedTuple});
    }

    public void checkSerialize(KryoTupleSerializer kryoTupleSerializer, AddressedTuple addressedTuple) {
        if (this.trySerializeLocal) {
            kryoTupleSerializer.serialize(addressedTuple.getTuple());
        }
    }

    public final WorkerTopologyContext getWorkerTopologyContext() {
        try {
            return new WorkerTopologyContext(this.systemTopology, this.topologyConf, this.taskToComponent, this.componentToSortedTasks, this.componentToStreamToFields, this.topologyId, ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(this.conf, this.topologyId)), ConfigUtils.workerPidsRoot(this.conf, this.topologyId), Integer.valueOf(this.port), this.localTaskIds, this.defaultSharedResources, this.userSharedResources, this.cachedTaskToNodePort, this.assignmentId, this.cachedNodeToHost);
        } catch (IOException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    public final WorkerUserContext getWorkerUserContext() {
        try {
            return new WorkerUserContext(this.systemTopology, this.topologyConf, this.taskToComponent, this.componentToSortedTasks, this.componentToStreamToFields, this.topologyId, ConfigUtils.supervisorStormResourcesPath(ConfigUtils.supervisorStormDistRoot(this.conf, this.topologyId)), ConfigUtils.workerPidsRoot(this.conf, this.topologyId), Integer.valueOf(this.port), this.localTaskIds, this.defaultSharedResources, this.userSharedResources, this.cachedTaskToNodePort, this.assignmentId, this.cachedNodeToHost);
        } catch (IOException e) {
            throw Utils.wrapInRuntime(e);
        }
    }

    private List<IWorkerHook> deserializeWorkerHooks() {
        ArrayList arrayList = new ArrayList();
        if (this.topology.is_set_worker_hooks()) {
            Iterator<ByteBuffer> it = this.topology.get_worker_hooks().iterator();
            while (it.hasNext()) {
                arrayList.add((IWorkerHook) Utils.javaDeserialize(Utils.toByteArray(it.next()), IWorkerHook.class));
            }
        }
        return arrayList;
    }

    public void runWorkerStartHooks() {
        WorkerUserContext workerUserContext = getWorkerUserContext();
        Iterator<IWorkerHook> it = getDeserializedWorkerHooks().iterator();
        while (it.hasNext()) {
            it.next().start(this.topologyConf, workerUserContext);
        }
    }

    public void runWorkerShutdownHooks() {
        Iterator<IWorkerHook> it = getDeserializedWorkerHooks().iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
    }

    public void closeResources() {
        LOG.info("Shutting down default resources");
        ((ExecutorService) this.defaultSharedResources.get(WorkerTopologyContext.SHARED_EXECUTOR)).shutdownNow();
        LOG.info("Shut down default resources");
    }

    public boolean areAllConnectionsReady() {
        return ((Boolean) this.cachedNodeToPortSocket.get().values().stream().map(WorkerState::isConnectionReady).reduce((bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        }).orElse(true)).booleanValue();
    }

    public Collection<IAutoCredentials> getAutoCredentials() {
        return this.autoCredentials;
    }

    public Credentials getCredentials() {
        return this.credentialsAtom.get();
    }

    public void setCredentials(Credentials credentials) {
        this.credentialsAtom.set(credentials);
    }

    private List<List<Long>> readWorkerExecutors(String str, int i, Assignment assignment) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Constants.SYSTEM_EXECUTOR_ID);
        for (Map.Entry<List<Long>, NodeInfo> entry : assignment.get_executor_node_port().entrySet()) {
            NodeInfo value = entry.getValue();
            if (value.get_node().equals(str) && value.get_port().iterator().next().longValue() == i) {
                arrayList.add(entry.getKey());
            }
        }
        return arrayList;
    }

    private Assignment getLocalAssignment(IStormClusterState iStormClusterState, String str) {
        try {
            SupervisorIfaceFactory supervisorIfaceFactory = this.supervisorIfaceSupplier.get();
            try {
                Assignment localAssignmentForStorm = supervisorIfaceFactory.getIface().getLocalAssignmentForStorm(str);
                if (supervisorIfaceFactory != null) {
                    supervisorIfaceFactory.close();
                }
                return localAssignmentForStorm;
            } finally {
            }
        } catch (Throwable th) {
            Assignment remoteAssignmentInfo = iStormClusterState.remoteAssignmentInfo(str, null);
            if (remoteAssignmentInfo == null) {
                throw new RuntimeException("Failed to read worker assignment. Supervisor client threw exception, and assignment in Zookeeper was null", th);
            }
            return remoteAssignmentInfo;
        }
    }

    private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> map, Set<List<Long>> set, Map<Integer, String> map2) {
        Integer num = ObjectReader.getInt(map.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
        Integer num2 = ObjectReader.getInt(map.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
        Integer num3 = ObjectReader.getInt(map.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));
        if (num2.intValue() > num.intValue() / 2) {
            throw new IllegalArgumentException("topology.producer.batch.size:" + num2 + " is greater than half of topology.executor.receive.buffer.size:" + num);
        }
        IWaitStrategy createBackPressureWaitStrategy = IWaitStrategy.createBackPressureWaitStrategy(map);
        HashMap hashMap = new HashMap();
        for (List<Long> list : set) {
            List<Integer> executorIdToTasks = StormCommon.executorIdToTasks(list);
            int intValue = executorIdToTasks.get(0).intValue();
            hashMap.put(list, new JCQueue("receive-queue" + list.toString(), "receive-queue", num.intValue(), num3.intValue(), num2.intValue(), createBackPressureWaitStrategy, getTopologyId(), ((long) intValue) == -1 ? "__system" : map2.get(Integer.valueOf(intValue)), executorIdToTasks, getPort(), this.metricRegistry));
        }
        return hashMap;
    }

    private Map<String, Object> makeDefaultResources() {
        return ImmutableMap.of(WorkerTopologyContext.SHARED_EXECUTOR, Executors.newFixedThreadPool(ObjectReader.getInt(this.conf.get(Config.TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE)).intValue()));
    }

    private Map<String, Object> makeUserResources() {
        return new HashMap();
    }

    private StormTimer mkHaltingTimer(String str) {
        return new StormTimer(str, (thread, th) -> {
            LOG.error("Error when processing event", th);
            Utils.exitProcess(20, "Error when processing an event");
        });
    }

    private Set<Integer> workerOutboundTasks() {
        WorkerTopologyContext workerTopologyContext = getWorkerTopologyContext();
        HashSet hashSet = new HashSet();
        Iterator<Integer> it = this.localTaskIds.iterator();
        while (it.hasNext()) {
            Iterator<Map<String, Grouping>> it2 = workerTopologyContext.getTargets(workerTopologyContext.getComponentId(it.next().intValue())).values().iterator();
            while (it2.hasNext()) {
                hashSet.addAll(it2.next().keySet());
            }
        }
        HashSet hashSet2 = new HashSet();
        for (Map.Entry entry : Utils.reverseMap(this.taskToComponent).entrySet()) {
            if (hashSet.contains(entry.getKey())) {
                hashSet2.addAll((Collection) entry.getValue());
            }
        }
        return hashSet2;
    }

    public Set<Integer> getOutboundTasks() {
        return this.outboundTasks;
    }

    public boolean hasRemoteOutboundTasks() {
        return !Sets.difference(new HashSet(this.outboundTasks), new HashSet(this.localTaskIds)).isEmpty();
    }

    public boolean isSingleWorker() {
        return Sets.difference(getTaskToComponent().keySet(), new HashSet(this.localTaskIds)).isEmpty();
    }

    public void haltWorkerTransfer() {
        this.workerTransfer.haltTransferThd();
    }

    public JCQueue getTransferQueue() {
        return this.workerTransfer.getTransferQueue();
    }

    public StormMetricRegistry getMetricRegistry() {
        return this.metricRegistry;
    }
}
