/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive.allocator;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.adaptive.allocator.StateSizeEstimates;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

@Internal
public class JobAllocationsInformation {
    private final Map<JobVertexID, List<VertexAllocationInformation>> vertexAllocations;

    JobAllocationsInformation(Map<JobVertexID, List<VertexAllocationInformation>> vertexAllocations) {
        this.vertexAllocations = vertexAllocations;
    }

    public static JobAllocationsInformation fromGraphAndState(ExecutionGraph graph, CompletedCheckpoint latestCheckpoint) {
        return new JobAllocationsInformation(JobAllocationsInformation.calculateAllocations(graph, StateSizeEstimates.fromGraphAndState(graph, latestCheckpoint)));
    }

    public List<VertexAllocationInformation> getAllocations(JobVertexID jobVertexID) {
        return this.vertexAllocations.getOrDefault(jobVertexID, Collections.emptyList());
    }

    private static Map<JobVertexID, List<VertexAllocationInformation>> calculateAllocations(ExecutionGraph graph, StateSizeEstimates stateSizeEstimates) {
        HashMap<JobVertexID, List<VertexAllocationInformation>> allocations = new HashMap<JobVertexID, List<VertexAllocationInformation>>();
        for (ExecutionJobVertex vertex : graph.getVerticesTopologically()) {
            JobVertexID jobVertexId = vertex.getJobVertexId();
            for (ExecutionVertex executionVertex : vertex.getTaskVertices()) {
                long stateSize = stateSizeEstimates.estimate(executionVertex.getID()).orElse(0L);
                AllocationID allocationId = executionVertex.getCurrentExecutionAttempt().getAssignedAllocationID();
                KeyGroupRange kgr = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(vertex.getMaxParallelism(), vertex.getParallelism(), executionVertex.getParallelSubtaskIndex());
                allocations.computeIfAbsent(jobVertexId, ignored -> new ArrayList()).add(new VertexAllocationInformation(allocationId, jobVertexId, kgr, stateSize));
            }
        }
        return allocations;
    }

    public static JobAllocationsInformation empty() {
        return new JobAllocationsInformation(Collections.emptyMap());
    }

    public boolean isEmpty() {
        return this.vertexAllocations.isEmpty();
    }

    public static class VertexAllocationInformation {
        private final AllocationID allocationID;
        private final JobVertexID jobVertexID;
        private final KeyGroupRange keyGroupRange;
        public final long stateSizeInBytes;

        public VertexAllocationInformation(AllocationID allocationID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, long stateSizeInBytes) {
            this.allocationID = allocationID;
            this.jobVertexID = jobVertexID;
            this.keyGroupRange = keyGroupRange;
            this.stateSizeInBytes = stateSizeInBytes;
        }

        public AllocationID getAllocationID() {
            return this.allocationID;
        }

        public JobVertexID getJobVertexID() {
            return this.jobVertexID;
        }

        public KeyGroupRange getKeyGroupRange() {
            return this.keyGroupRange;
        }
    }
}

