package alluxio.job.plan.load;

import alluxio.AlluxioURI;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.exception.status.FailedPreconditionException;
import alluxio.job.JobConfig;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.util.CommonUtils;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.TieredIdentity;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.MoreObjects;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/job/plan/load/LoadDefinition.class */
public final class LoadDefinition extends AbstractVoidPlanDefinition<LoadConfig, ArrayList<LoadTask>> {
    private static final Logger LOG = LoggerFactory.getLogger(LoadDefinition.class);
    private static final int MAX_BUFFER_SIZE = 524288000;
    private static final int JOBS_PER_WORKER = 10;

    /* loaded from: input_file:alluxio/job/plan/load/LoadDefinition$LoadTask.class */
    public static class LoadTask implements Serializable {
        private static final long serialVersionUID = 2028545900913354425L;
        final long mBlockId;
        final WorkerNetAddress mWorkerNetAddress;

        public LoadTask(long j, WorkerNetAddress workerNetAddress) {
            this.mBlockId = j;
            this.mWorkerNetAddress = workerNetAddress;
        }

        public long getBlockId() {
            return this.mBlockId;
        }

        public WorkerNetAddress getWorkerNetAddress() {
            return this.mWorkerNetAddress;
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("blockId", this.mBlockId).add("workerNetAddress", this.mWorkerNetAddress).toString();
        }
    }

    public Set<Pair<WorkerInfo, ArrayList<LoadTask>>> selectExecutors(LoadConfig loadConfig, List<WorkerInfo> list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        Map map = (Map) list.stream().collect(Collectors.toMap(workerInfo -> {
            return workerInfo.getAddress().getHost();
        }, workerInfo2 -> {
            return workerInfo2;
        }));
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (BlockWorkerInfo blockWorkerInfo : selectExecutorsContext.getFsContext().getCachedWorkers()) {
            if (map.containsKey(blockWorkerInfo.getNetAddress().getHost())) {
                String upperCase = blockWorkerInfo.getNetAddress().getHost().toUpperCase();
                if (isEmptySet(loadConfig.getExcludedWorkerSet()) || !loadConfig.getExcludedWorkerSet().contains(upperCase)) {
                    boolean z = false;
                    if (blockWorkerInfo.getNetAddress().getTieredIdentity().getTiers() != null && (!isEmptySet(loadConfig.getLocalityIds()) || !isEmptySet(loadConfig.getExcludedLocalityIds()))) {
                        boolean z2 = false;
                        Iterator it = blockWorkerInfo.getNetAddress().getTieredIdentity().getTiers().iterator();
                        while (true) {
                            if (!it.hasNext()) {
                                break;
                            }
                            TieredIdentity.LocalityTier localityTier = (TieredIdentity.LocalityTier) it.next();
                            if (!isEmptySet(loadConfig.getExcludedLocalityIds()) && loadConfig.getExcludedLocalityIds().contains(localityTier.getValue().toUpperCase())) {
                                z2 = true;
                                break;
                            }
                            if (!isEmptySet(loadConfig.getLocalityIds()) && loadConfig.getLocalityIds().contains(localityTier.getValue().toUpperCase())) {
                                z = true;
                                break;
                            }
                        }
                        if (z2) {
                        }
                    }
                    if ((isEmptySet(loadConfig.getWorkerSet()) && isEmptySet(loadConfig.getLocalityIds())) || z || (!isEmptySet(loadConfig.getWorkerSet()) && loadConfig.getWorkerSet().contains(upperCase))) {
                        arrayList2.add(blockWorkerInfo);
                    }
                }
            } else {
                LOG.warn("Worker on host {} has no local job worker", blockWorkerInfo.getNetAddress().getHost());
                arrayList.add(blockWorkerInfo.getNetAddress().getHost());
            }
        }
        LinkedListMultimap create = LinkedListMultimap.create();
        for (FileBlockInfo fileBlockInfo : selectExecutorsContext.getFileSystem().getStatus(new AlluxioURI(loadConfig.getFilePath())).getFileBlockInfos()) {
            List<BlockWorkerInfo> workersWithoutBlock = getWorkersWithoutBlock(arrayList2, fileBlockInfo);
            int replication = loadConfig.getReplication() - fileBlockInfo.getBlockInfo().getLocations().size();
            if (workersWithoutBlock.size() < replication) {
                throw new FailedPreconditionException(String.format("Failed to find enough block workers to replicate to. Needed %s but only found %s. Available workers without the block: %s" + (arrayList.isEmpty() ? "" : ". The following workers could not be used because they have no local job workers: " + arrayList), Integer.valueOf(replication), Integer.valueOf(workersWithoutBlock.size()), workersWithoutBlock));
            }
            Collections.shuffle(workersWithoutBlock);
            for (int i = 0; i < replication; i++) {
                create.put((WorkerInfo) map.get(workersWithoutBlock.get(i).getNetAddress().getHost()), new LoadTask(fileBlockInfo.getBlockInfo().getBlockId(), workersWithoutBlock.get(i).getNetAddress()));
            }
        }
        HashSet newHashSet = Sets.newHashSet();
        for (Map.Entry entry : create.asMap().entrySet()) {
            for (List list2 : CommonUtils.partition(Lists.newArrayList((Collection) entry.getValue()), JOBS_PER_WORKER)) {
                if (!list2.isEmpty()) {
                    newHashSet.add(new Pair(entry.getKey(), Lists.newArrayList(list2)));
                }
            }
        }
        return newHashSet;
    }

    private List<BlockWorkerInfo> getWorkersWithoutBlock(List<BlockWorkerInfo> list, FileBlockInfo fileBlockInfo) {
        List list2 = (List) fileBlockInfo.getBlockInfo().getLocations().stream().map((v0) -> {
            return v0.getWorkerAddress();
        }).collect(Collectors.toList());
        return (List) list.stream().filter(blockWorkerInfo -> {
            return !list2.contains(blockWorkerInfo.getNetAddress());
        }).collect(Collectors.toList());
    }

    @Override // alluxio.job.plan.PlanDefinition
    public SerializableVoid runTask(LoadConfig loadConfig, ArrayList<LoadTask> arrayList, RunTaskContext runTaskContext) throws Exception {
        URIStatus status = runTaskContext.getFileSystem().getStatus(new AlluxioURI(loadConfig.getFilePath()));
        Iterator<LoadTask> it = arrayList.iterator();
        while (it.hasNext()) {
            LoadTask next = it.next();
            JobUtils.loadBlock(status, runTaskContext.getFsContext(), next.getBlockId(), next.getWorkerNetAddress(), loadConfig.isDirectCache());
            LOG.info("Loaded file " + loadConfig.getFilePath() + " block " + next.getBlockId());
        }
        return null;
    }

    private boolean isEmptySet(Set set) {
        return set == null || set.isEmpty();
    }

    @Override // alluxio.job.plan.PlanDefinition
    public Class<LoadConfig> getJobConfigClass() {
        return LoadConfig.class;
    }

    @Override // alluxio.job.plan.PlanDefinition
    public /* bridge */ /* synthetic */ Set selectExecutors(JobConfig jobConfig, List list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        return selectExecutors((LoadConfig) jobConfig, (List<WorkerInfo>) list, selectExecutorsContext);
    }
}
