/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.accumulators.LongCounter;
import org.apache.flink.api.common.io.StrictlyLocalAssignment;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.io.LocatableInputSplit;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.util.SerializableObject;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionJobVertex
implements Serializable {
    private static final long serialVersionUID = 42L;
    private static final Logger LOG = ExecutionGraph.LOG;
    private final SerializableObject stateMonitor = new SerializableObject();
    private final ExecutionGraph graph;
    private final JobVertex jobVertex;
    private final ExecutionVertex[] taskVertices;
    private IntermediateResult[] producedDataSets;
    private final List<IntermediateResult> inputs;
    private final int parallelism;
    private final boolean[] finishedSubtasks;
    private volatile int numSubtasksInFinalState;
    private final SlotSharingGroup slotSharingGroup;
    private final CoLocationGroup coLocationGroup;
    private final InputSplit[] inputSplits;
    private List<LocatableInputSplit>[] inputSplitsPerSubtask;
    private InputSplitAssigner splitAssigner;

    public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, FiniteDuration timeout) throws JobException {
        this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
    }

    public ExecutionJobVertex(ExecutionGraph graph, JobVertex jobVertex, int defaultParallelism, FiniteDuration timeout, long createTimestamp) throws JobException {
        int i;
        int numTaskVertices;
        if (graph == null || jobVertex == null) {
            throw new NullPointerException();
        }
        this.graph = graph;
        this.jobVertex = jobVertex;
        int vertexParallelism = jobVertex.getParallelism();
        this.parallelism = numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
        this.taskVertices = new ExecutionVertex[numTaskVertices];
        this.inputs = new ArrayList<IntermediateResult>(jobVertex.getInputs().size());
        this.slotSharingGroup = jobVertex.getSlotSharingGroup();
        this.coLocationGroup = jobVertex.getCoLocationGroup();
        if (this.coLocationGroup != null && this.slotSharingGroup == null) {
            throw new JobException("Vertex uses a co-location constraint without using slot sharing");
        }
        this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
        for (i = 0; i < jobVertex.getProducedDataSets().size(); ++i) {
            IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
            this.producedDataSets[i] = new IntermediateResult(result.getId(), this, numTaskVertices, result.getResultType(), result.getEagerlyDeployConsumers());
        }
        for (i = 0; i < numTaskVertices; ++i) {
            ExecutionVertex vertex;
            this.taskVertices[i] = vertex = new ExecutionVertex(this, i, this.producedDataSets, timeout, createTimestamp);
        }
        for (IntermediateResult ir : this.producedDataSets) {
            if (ir.getNumberOfAssignedPartitions() == this.parallelism) continue;
            throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
        }
        try {
            InputSplitSource<?> splitSource = jobVertex.getInputSplitSource();
            if (splitSource != null) {
                this.inputSplits = splitSource.createInputSplits(numTaskVertices);
                if (this.inputSplits != null) {
                    if (splitSource instanceof StrictlyLocalAssignment) {
                        this.inputSplitsPerSubtask = this.computeLocalInputSplitsPerTask(this.inputSplits);
                        this.splitAssigner = new PredeterminedInputSplitAssigner(this.inputSplitsPerSubtask);
                    } else {
                        this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
                    }
                }
            } else {
                this.inputSplits = null;
            }
        }
        catch (Throwable t) {
            throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
        }
        this.finishedSubtasks = new boolean[this.parallelism];
    }

    public ExecutionGraph getGraph() {
        return this.graph;
    }

    public JobVertex getJobVertex() {
        return this.jobVertex;
    }

    public int getParallelism() {
        return this.parallelism;
    }

    public JobID getJobId() {
        return this.graph.getJobID();
    }

    public JobVertexID getJobVertexId() {
        return this.jobVertex.getID();
    }

    public ExecutionVertex[] getTaskVertices() {
        return this.taskVertices;
    }

    public IntermediateResult[] getProducedDataSets() {
        return this.producedDataSets;
    }

    public InputSplitAssigner getSplitAssigner() {
        return this.splitAssigner;
    }

    public SlotSharingGroup getSlotSharingGroup() {
        return this.slotSharingGroup;
    }

    public CoLocationGroup getCoLocationGroup() {
        return this.coLocationGroup;
    }

    public List<IntermediateResult> getInputs() {
        return this.inputs;
    }

    public boolean isInFinalState() {
        return this.numSubtasksInFinalState == this.parallelism;
    }

    public ExecutionState getAggregateState() {
        int[] num = new int[ExecutionState.values().length];
        for (ExecutionVertex vertex : this.taskVertices) {
            int n = vertex.getExecutionState().ordinal();
            num[n] = num[n] + 1;
        }
        return ExecutionJobVertex.getAggregateJobVertexState(num, this.parallelism);
    }

    public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
        List<JobEdge> inputs = this.jobVertex.getInputs();
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", new Object[]{this.jobVertex.getID(), this.jobVertex.getName(), inputs.size()}));
        }
        for (int num = 0; num < inputs.size(); ++num) {
            IntermediateResult ires;
            JobEdge edge = inputs.get(num);
            if (LOG.isDebugEnabled()) {
                if (edge.getSource() == null) {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.", new Object[]{num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSourceId()}));
                } else {
                    LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).", new Object[]{num, this.jobVertex.getID(), this.jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()}));
                }
            }
            if ((ires = intermediateDataSets.get((Object)edge.getSourceId())) == null) {
                throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID " + (Object)((Object)edge.getSourceId()));
            }
            this.inputs.add(ires);
            int consumerIndex = ires.registerConsumer();
            for (int i = 0; i < this.parallelism; ++i) {
                ExecutionVertex ev = this.taskVertices[i];
                ev.connectSource(num, ires, edge, consumerIndex);
            }
        }
    }

    public void scheduleAll(Scheduler scheduler, boolean queued) throws NoResourceAvailableException {
        ExecutionVertex[] vertices = this.taskVertices;
        if (this.inputSplitsPerSubtask != null) {
            Map<String, List<Instance>> instances = scheduler.getInstancesByHost();
            HashMap<String, Integer> assignments = new HashMap<String, Integer>();
            for (int i = 0; i < vertices.length; ++i) {
                String[] hostNames;
                List<LocatableInputSplit> splitsForHost = this.inputSplitsPerSubtask[i];
                if (splitsForHost == null || splitsForHost.isEmpty() || (hostNames = splitsForHost.get(0).getHostnames()) == null || hostNames.length == 0 || hostNames[0] == null) continue;
                String host = hostNames[0];
                ExecutionVertex v = vertices[i];
                List<Instance> instancesOnHost = instances.get(host);
                if (instancesOnHost == null || instancesOnHost.isEmpty()) {
                    throw new NoResourceAvailableException("Cannot schedule a strictly local task to host " + host + ". No TaskManager available on that host.");
                }
                Integer pos = (Integer)assignments.get(host);
                if (pos == null) {
                    pos = 0;
                    assignments.put(host, 0);
                } else {
                    assignments.put(host, (pos + 1) % instancesOnHost.size());
                }
                v.setLocationConstraintHosts(Collections.singletonList(instancesOnHost.get(pos)));
            }
        }
        for (ExecutionVertex ev : vertices) {
            ev.scheduleForExecution(scheduler, queued);
        }
    }

    public void cancel() {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.cancel();
        }
    }

    public void fail(Throwable t) {
        for (ExecutionVertex ev : this.getTaskVertices()) {
            ev.fail(t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitForAllVerticesToReachFinishingState() throws InterruptedException {
        SerializableObject serializableObject = this.stateMonitor;
        synchronized (serializableObject) {
            while (this.numSubtasksInFinalState < this.parallelism) {
                this.stateMonitor.wait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resetForNewExecution() {
        if (this.numSubtasksInFinalState != 0 && this.numSubtasksInFinalState != this.parallelism) {
            throw new IllegalStateException("Cannot reset vertex that is not in final state");
        }
        SerializableObject serializableObject = this.stateMonitor;
        synchronized (serializableObject) {
            if (this.slotSharingGroup != null) {
                this.slotSharingGroup.clearTaskAssignment();
            }
            for (int i = 0; i < this.parallelism; ++i) {
                this.taskVertices[i].resetForNewExecution();
                if (!this.finishedSubtasks[i]) continue;
                this.finishedSubtasks[i] = false;
                --this.numSubtasksInFinalState;
            }
            if (this.numSubtasksInFinalState != 0) {
                throw new RuntimeException("Bug: resetting the execution job vertex failed.");
            }
            try {
                if (this.inputSplits != null && this.inputSplitsPerSubtask == null) {
                    InputSplitSource<?> splitSource = this.jobVertex.getInputSplitSource();
                    this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits);
                }
            }
            catch (Throwable t) {
                throw new RuntimeException("Re-creating the input split assigner failed: " + t.getMessage(), t);
            }
            for (IntermediateResult result : this.producedDataSets) {
                result.resetForNewExecution();
            }
        }
    }

    public void prepareForArchiving() {
        for (ExecutionVertex vertex : this.taskVertices) {
            vertex.prepareForArchiving();
        }
        this.inputs.clear();
        this.producedDataSets = null;
        if (this.slotSharingGroup != null) {
            this.slotSharingGroup.clearTaskAssignment();
        }
        if (this.coLocationGroup != null) {
            this.coLocationGroup.resetConstraints();
        }
        this.splitAssigner = null;
        if (this.inputSplits != null) {
            for (int i = 0; i < this.inputSplits.length; ++i) {
                this.inputSplits[i] = null;
            }
        }
        this.inputSplitsPerSubtask = null;
    }

    void vertexFinished(int subtask) {
        this.subtaskInFinalState(subtask);
    }

    void vertexCancelled(int subtask) {
        this.subtaskInFinalState(subtask);
    }

    void vertexFailed(int subtask, Throwable error) {
        this.subtaskInFinalState(subtask);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subtaskInFinalState(int subtask) {
        SerializableObject serializableObject = this.stateMonitor;
        synchronized (serializableObject) {
            if (!this.finishedSubtasks[subtask]) {
                this.finishedSubtasks[subtask] = true;
                if (this.numSubtasksInFinalState + 1 == this.parallelism) {
                    try {
                        this.getJobVertex().finalizeOnMaster(this.getGraph().getUserClassLoader());
                    }
                    catch (Throwable t) {
                        this.getGraph().fail(t);
                    }
                    ++this.numSubtasksInFinalState;
                    this.stateMonitor.notifyAll();
                    this.graph.jobVertexInFinalState();
                } else {
                    ++this.numSubtasksInFinalState;
                }
            }
        }
    }

    public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
        long bytesRead = 0L;
        long bytesWritten = 0L;
        long recordsRead = 0L;
        long recordsWritten = 0L;
        for (ExecutionVertex v : this.getTaskVertices()) {
            Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> metrics = v.getCurrentExecutionAttempt().getFlinkAccumulators();
            if (metrics == null) continue;
            LongCounter br = (LongCounter)metrics.get((Object)AccumulatorRegistry.Metric.NUM_BYTES_IN);
            LongCounter bw = (LongCounter)metrics.get((Object)AccumulatorRegistry.Metric.NUM_BYTES_OUT);
            LongCounter rr = (LongCounter)metrics.get((Object)AccumulatorRegistry.Metric.NUM_RECORDS_IN);
            LongCounter rw = (LongCounter)metrics.get((Object)AccumulatorRegistry.Metric.NUM_RECORDS_OUT);
            bytesRead += br != null ? br.getLocalValuePrimitive() : 0L;
            bytesWritten += bw != null ? bw.getLocalValuePrimitive() : 0L;
            recordsRead += rr != null ? rr.getLocalValuePrimitive() : 0L;
            recordsWritten += rw != null ? rw.getLocalValuePrimitive() : 0L;
        }
        HashMap agg = new HashMap();
        agg.put(AccumulatorRegistry.Metric.NUM_BYTES_IN, (Accumulator<?, ?>)new LongCounter(bytesRead));
        agg.put(AccumulatorRegistry.Metric.NUM_BYTES_OUT, (Accumulator<?, ?>)new LongCounter(bytesWritten));
        agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_IN, (Accumulator<?, ?>)new LongCounter(recordsRead));
        agg.put(AccumulatorRegistry.Metric.NUM_RECORDS_OUT, (Accumulator<?, ?>)new LongCounter(recordsWritten));
        return agg;
    }

    public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
        HashMap userAccumulators = new HashMap();
        for (ExecutionVertex vertex : this.taskVertices) {
            Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
            if (next == null) continue;
            AccumulatorHelper.mergeInto(userAccumulators, next);
        }
        return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
    }

    private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {
        int numSubTasks = this.getParallelism();
        if (numSubTasks > splits.length) {
            throw new JobException("Strictly local assignment requires at least as many splits as subtasks.");
        }
        HashMap<String, ArrayList<LocatableInputSplit>> splitsByHost = new HashMap<String, ArrayList<LocatableInputSplit>>();
        for (InputSplit split : splits) {
            if (!(split instanceof LocatableInputSplit)) {
                throw new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " + "Strictly local assignment requires LocatableInputSplit");
            }
            LocatableInputSplit lis = (LocatableInputSplit)split;
            if (lis.getHostnames() == null) {
                throw new JobException("LocatableInputSplit has no host information. Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
            }
            if (lis.getHostnames().length != 1) {
                throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
            }
            String hostName = lis.getHostnames()[0];
            if (hostName == null) {
                throw new JobException("For strictly local input split assignment, no null host names are allowed.");
            }
            ArrayList<LocatableInputSplit> hostSplits = (ArrayList<LocatableInputSplit>)splitsByHost.get(hostName);
            if (hostSplits == null) {
                hostSplits = new ArrayList<LocatableInputSplit>();
                splitsByHost.put(hostName, hostSplits);
            }
            hostSplits.add(lis);
        }
        int numHosts = splitsByHost.size();
        if (numSubTasks < numHosts) {
            throw new JobException("Strictly local split assignment requires at least as many parallel subtasks as distinct split hosts. Please increase the parallelism of DataSource " + this.getJobVertex().getName() + " to at least " + numHosts + ".");
        }
        ArrayList hosts = new ArrayList(splitsByHost.keySet());
        Collections.sort(hosts);
        List[] subTaskSplitAssignment = new List[numSubTasks];
        int subtasksPerHost = numSubTasks / numHosts;
        int hostsWithOneMore = numSubTasks % numHosts;
        int subtaskNum = 0;
        for (int hostNum = 0; hostNum < numHosts; ++hostNum) {
            String host = (String)hosts.get(hostNum);
            ArrayList splitsOnHost = (ArrayList)splitsByHost.get(host);
            int numSplitsOnHost = splitsOnHost.size();
            int subtasks = Math.min(numSplitsOnHost, hostNum < hostsWithOneMore ? subtasksPerHost + 1 : subtasksPerHost);
            int splitsPerSubtask = numSplitsOnHost / subtasks;
            int subtasksWithOneMore = numSplitsOnHost % subtasks;
            int splitnum = 0;
            for (int i = 0; i < subtasks; ++i) {
                ArrayList splitList;
                int numSplitsForSubtask;
                int n = numSplitsForSubtask = i < subtasksWithOneMore ? splitsPerSubtask + 1 : splitsPerSubtask;
                if (numSplitsForSubtask == numSplitsOnHost) {
                    splitList = splitsOnHost;
                } else {
                    splitList = new ArrayList(numSplitsForSubtask);
                    for (int k = 0; k < numSplitsForSubtask; ++k) {
                        splitList.add(splitsOnHost.get(splitnum++));
                    }
                }
                subTaskSplitAssignment[subtaskNum++] = splitList;
            }
        }
        return subTaskSplitAssignment;
    }

    public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
        if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
            throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
        }
        if (verticesPerState[ExecutionState.FAILED.ordinal()] > 0) {
            return ExecutionState.FAILED;
        }
        if (verticesPerState[ExecutionState.CANCELING.ordinal()] > 0) {
            return ExecutionState.CANCELING;
        }
        if (verticesPerState[ExecutionState.CANCELED.ordinal()] > 0) {
            return ExecutionState.CANCELED;
        }
        if (verticesPerState[ExecutionState.RUNNING.ordinal()] > 0) {
            return ExecutionState.RUNNING;
        }
        if (verticesPerState[ExecutionState.FINISHED.ordinal()] > 0) {
            return verticesPerState[ExecutionState.FINISHED.ordinal()] == parallelism ? ExecutionState.FINISHED : ExecutionState.RUNNING;
        }
        return ExecutionState.CREATED;
    }

    public static class PredeterminedInputSplitAssigner
    implements InputSplitAssigner {
        private List<LocatableInputSplit>[] inputSplitsPerSubtask;

        public PredeterminedInputSplitAssigner(List<LocatableInputSplit>[] inputSplitsPerSubtask) {
            this.inputSplitsPerSubtask = new List[inputSplitsPerSubtask.length];
            for (int i = 0; i < inputSplitsPerSubtask.length; ++i) {
                List<LocatableInputSplit> next = inputSplitsPerSubtask[i];
                this.inputSplitsPerSubtask[i] = next == null || next.isEmpty() ? Collections.emptyList() : new ArrayList<LocatableInputSplit>(inputSplitsPerSubtask[i]);
            }
        }

        public InputSplit getNextInputSplit(String host, int taskId) {
            if (this.inputSplitsPerSubtask[taskId].isEmpty()) {
                return null;
            }
            return (InputSplit)this.inputSplitsPerSubtask[taskId].remove(this.inputSplitsPerSubtask[taskId].size() - 1);
        }
    }
}

