/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hive.ql.exec.tez;

import io.trino.hive.;
import io.trino.hive.$internal.com.google.common.base.Function;
import io.trino.hive.$internal.com.google.common.base.Preconditions;
import io.trino.hive.$internal.com.google.common.collect.Iterators;
import io.trino.hive.$internal.com.google.common.collect.Lists;
import io.trino.hive.$internal.org.apache.commons.lang.StringUtils;
import io.trino.hive.$internal.org.slf4j.Logger;
import io.trino.hive.$internal.org.slf4j.LoggerFactory;
import java.io.DataOutput;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.ZipOutputStream;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.tez.CustomEdgeConfiguration;
import org.apache.hadoop.hive.ql.exec.tez.CustomPartitionEdge;
import org.apache.hadoop.hive.ql.exec.tez.CustomPartitionVertex;
import org.apache.hadoop.hive.ql.exec.tez.CustomVertexConfiguration;
import org.apache.hadoop.hive.ql.exec.tez.HivePreWarmProcessor;
import org.apache.hadoop.hive.ql.exec.tez.HiveSplitGenerator;
import org.apache.hadoop.hive.ql.exec.tez.MapTezProcessor;
import org.apache.hadoop.hive.ql.exec.tez.MergeFileTezProcessor;
import org.apache.hadoop.hive.ql.exec.tez.ReduceTezProcessor;
import org.apache.hadoop.hive.ql.exec.tez.tools.TezMergedLogicalInput;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileOutputFormat;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsCollectionContext;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.PreWarmVertex;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.library.api.Partitioner;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductConfig;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductEdgeManager;
import org.apache.tez.runtime.library.cartesianproduct.CartesianProductVertexManager;
import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;

public class DagUtils {
    public static final String TEZ_TMP_DIR_KEY = "_hive_tez_tmp_dir";
    private static final Logger LOG = LoggerFactory.getLogger(DagUtils.class.getName());
    private static final String TEZ_DIR = "_tez_scratch_dir";
    private static final DagUtils instance = new DagUtils();
    public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX = "hive.tez.current.merge.file.prefix";
    public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
    private final ConcurrentHashMap<String, Object> copyNotifiers = new ConcurrentHashMap();

    private void addCredentials(MapWork mapWork, DAG dag) {
        Set<Path> paths = mapWork.getPathToAliases().keySet();
        if (!paths.isEmpty()) {
            Iterator<URI> pathIterator = Iterators.transform(paths.iterator(), new Function<Path, URI>(){

                @Override
                public URI apply(Path path) {
                    return path.toUri();
                }
            });
            HashSet uris = new HashSet();
            Iterators.addAll(uris, pathIterator);
            if (LOG.isDebugEnabled()) {
                for (URI uri : uris) {
                    LOG.debug("Marking URI as needing credentials: " + uri);
                }
            }
            dag.addURIsForCredentials(uris);
        }
    }

    private void addCredentials(ReduceWork reduceWork, DAG dag) {
    }

    private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) {
        JobConf conf = new JobConf((Configuration)baseConf);
        conf.set("__hive.context.name", mapWork.getName());
        if (mapWork.getNumMapTasks() != null) {
            conf.setInt("mapreduce.job.maps", mapWork.getNumMapTasks().intValue());
        }
        if (mapWork.getMaxSplitSize() != null) {
            HiveConf.setLongVar((Configuration)conf, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, mapWork.getMaxSplitSize());
        }
        if (mapWork.getMinSplitSize() != null) {
            HiveConf.setLongVar((Configuration)conf, HiveConf.ConfVars.MAPREDMINSPLITSIZE, mapWork.getMinSplitSize());
        }
        if (mapWork.getMinSplitSizePerNode() != null) {
            HiveConf.setLongVar((Configuration)conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, mapWork.getMinSplitSizePerNode());
        }
        if (mapWork.getMinSplitSizePerRack() != null) {
            HiveConf.setLongVar((Configuration)conf, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, mapWork.getMinSplitSizePerRack());
        }
        Utilities.setInputAttributes((Configuration)conf, mapWork);
        String inpFormat = HiveConf.getVar((Configuration)conf, HiveConf.ConfVars.HIVETEZINPUTFORMAT);
        if (mapWork.isUseBucketizedHiveInputFormat()) {
            inpFormat = BucketizedHiveInputFormat.class.getName();
        }
        if (mapWork.getDummyTableScan()) {
            inpFormat = CombineHiveInputFormat.class.getName();
        }
        conf.set(TEZ_TMP_DIR_KEY, context.getMRTmpPath().toUri().toString());
        conf.set("mapred.mapper.class", ExecMapper.class.getName());
        conf.set("mapred.input.format.class", inpFormat);
        if (mapWork instanceof MergeFileWork) {
            MergeFileWork mfWork = (MergeFileWork)mapWork;
            conf.set("mapred.mapper.class", MergeFileMapper.class.getName());
            conf.set("mapred.input.format.class", mfWork.getInputformat());
            conf.setClass("mapred.output.format.class", MergeFileOutputFormat.class, FileOutputFormat.class);
        }
        return conf;
    }

    public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w, TezEdgeProperty edgeProp, BaseWork work, TezWork tezWork) throws IOException {
        Class mergeInputClass;
        LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getName());
        TezEdgeProperty.EdgeType edgeType = edgeProp.getEdgeType();
        switch (edgeType) {
            case BROADCAST_EDGE: {
                mergeInputClass = ConcatenatedMergedKeyValueInput.class;
                break;
            }
            case CUSTOM_EDGE: {
                mergeInputClass = ConcatenatedMergedKeyValueInput.class;
                int numBuckets = edgeProp.getNumBuckets();
                CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work));
                DataOutputBuffer dob = new DataOutputBuffer();
                vertexConf.write((DataOutput)dob);
                VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)CustomPartitionVertex.class.getName());
                byte[] userPayloadBytes = dob.getData();
                ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
                desc.setUserPayload(UserPayload.create((ByteBuffer)userPayload));
                w.setVertexManagerPlugin(desc);
                break;
            }
            case CUSTOM_SIMPLE_EDGE: {
                mergeInputClass = ConcatenatedMergedKeyValueInput.class;
                break;
            }
            case ONE_TO_ONE_EDGE: {
                mergeInputClass = ConcatenatedMergedKeyValueInput.class;
                break;
            }
            case XPROD_EDGE: {
                mergeInputClass = ConcatenatedMergedKeyValueInput.class;
                break;
            }
            case SIMPLE_EDGE: {
                this.setupAutoReducerParallelism(edgeProp, w);
            }
            default: {
                mergeInputClass = TezMergedLogicalInput.class;
            }
        }
        return GroupInputEdge.create((VertexGroup)group, (Vertex)w, (EdgeProperty)this.createEdgeProperty(w, edgeProp, (Configuration)vConf, work, tezWork), (InputDescriptor)InputDescriptor.create((String)mergeInputClass.getName()));
    }

    public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp, BaseWork work, TezWork tezWork) throws IOException {
        switch (edgeProp.getEdgeType()) {
            case CUSTOM_EDGE: {
                int numBuckets = edgeProp.getNumBuckets();
                CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, tezWork.getVertexType(work));
                DataOutputBuffer dob = new DataOutputBuffer();
                vertexConf.write((DataOutput)dob);
                VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)CustomPartitionVertex.class.getName());
                byte[] userPayloadBytes = dob.getData();
                ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
                desc.setUserPayload(UserPayload.create((ByteBuffer)userPayload));
                w.setVertexManagerPlugin(desc);
                break;
            }
            case XPROD_EDGE: {
                break;
            }
            case SIMPLE_EDGE: {
                this.setupAutoReducerParallelism(edgeProp, w);
                break;
            }
            case CUSTOM_SIMPLE_EDGE: {
                this.setupQuickStart(edgeProp, w);
                break;
            }
        }
        return Edge.create((Vertex)v, (Vertex)w, (EdgeProperty)this.createEdgeProperty(w, edgeProp, (Configuration)vConf, work, tezWork));
    }

    private EdgeProperty createEdgeProperty(Vertex w, TezEdgeProperty edgeProp, Configuration conf, BaseWork work, TezWork tezWork) throws IOException {
        MRHelpers.translateMRConfToTez((Configuration)conf);
        String keyClass = conf.get("tez.runtime.key.class");
        String valClass = conf.get("tez.runtime.value.class");
        String partitionerClassName = conf.get("mapred.partitioner.class");
        TezEdgeProperty.EdgeType edgeType = edgeProp.getEdgeType();
        switch (edgeType) {
            case BROADCAST_EDGE: {
                UnorderedKVEdgeConfig et1Conf = UnorderedKVEdgeConfig.newBuilder((String)keyClass, (String)valClass).setFromConfiguration(conf).setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null).setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null).build();
                return et1Conf.createDefaultBroadcastEdgeProperty();
            }
            case CUSTOM_EDGE: {
                assert (partitionerClassName != null);
                Map<String, String> partitionerConf = this.createPartitionerConf(partitionerClassName, conf);
                UnorderedPartitionedKVEdgeConfig et2Conf = UnorderedPartitionedKVEdgeConfig.newBuilder((String)keyClass, (String)valClass, (String)MRPartitioner.class.getName(), partitionerConf).setFromConfiguration(conf).setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null).setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null).build();
                EdgeManagerPluginDescriptor edgeDesc = EdgeManagerPluginDescriptor.create((String)CustomPartitionEdge.class.getName());
                CustomEdgeConfiguration edgeConf = new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
                DataOutputBuffer dob = new DataOutputBuffer();
                edgeConf.write((DataOutput)dob);
                byte[] userPayload = dob.getData();
                edgeDesc.setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(userPayload)));
                return et2Conf.createDefaultCustomEdgeProperty(edgeDesc);
            }
            case CUSTOM_SIMPLE_EDGE: {
                assert (partitionerClassName != null);
                Map<String, String> partitionerConf = this.createPartitionerConf(partitionerClassName, conf);
                UnorderedPartitionedKVEdgeConfig et3Conf = UnorderedPartitionedKVEdgeConfig.newBuilder((String)keyClass, (String)valClass, (String)MRPartitioner.class.getName(), partitionerConf).setFromConfiguration(conf).setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null).setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null).build();
                return et3Conf.createDefaultEdgeProperty();
            }
            case ONE_TO_ONE_EDGE: {
                UnorderedKVEdgeConfig et4Conf = UnorderedKVEdgeConfig.newBuilder((String)keyClass, (String)valClass).setFromConfiguration(conf).setKeySerializationClass(TezBytesWritableSerialization.class.getName(), null).setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null).build();
                return et4Conf.createDefaultOneToOneEdgeProperty();
            }
            case XPROD_EDGE: {
                EdgeManagerPluginDescriptor edgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)CartesianProductEdgeManager.class.getName());
                ArrayList<String> crossProductSources = new ArrayList<String>();
                for (BaseWork parentWork : tezWork.getParents(work)) {
                    if (TezEdgeProperty.EdgeType.XPROD_EDGE != tezWork.getEdgeType(parentWork, work)) continue;
                    crossProductSources.add(parentWork.getName());
                }
                CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
                edgeManagerDescriptor.setUserPayload(cpConfig.toUserPayload(new TezConfiguration(conf)));
                UnorderedPartitionedKVEdgeConfig cpEdgeConf = UnorderedPartitionedKVEdgeConfig.newBuilder((String)keyClass, (String)valClass, (String)ValueHashPartitioner.class.getName()).build();
                return cpEdgeConf.createDefaultCustomEdgeProperty(edgeManagerDescriptor);
            }
        }
        assert (partitionerClassName != null);
        Map<String, String> partitionerConf = this.createPartitionerConf(partitionerClassName, conf);
        OrderedPartitionedKVEdgeConfig et5Conf = OrderedPartitionedKVEdgeConfig.newBuilder((String)keyClass, (String)valClass, (String)MRPartitioner.class.getName(), partitionerConf).setFromConfiguration(conf).setKeySerializationClass(TezBytesWritableSerialization.class.getName(), TezBytesComparator.class.getName(), null).setValueSerializationClass(TezBytesWritableSerialization.class.getName(), null).build();
        return et5Conf.createDefaultEdgeProperty();
    }

    private Map<String, String> createPartitionerConf(String partitionerClassName, Configuration baseConf) {
        HashMap<String, String> partitionerConf = new HashMap<String, String>();
        partitionerConf.put("mapred.partitioner.class", partitionerClassName);
        if (baseConf.get("mapreduce.totalorderpartitioner.path") != null) {
            partitionerConf.put("mapreduce.totalorderpartitioner.path", baseConf.get("mapreduce.totalorderpartitioner.path"));
        }
        return partitionerConf;
    }

    public static Resource getContainerResource(Configuration conf) {
        int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) : conf.getInt("mapreduce.map.memory.mb", 1024);
        int cpus = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) > 0 ? HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) : conf.getInt("mapreduce.map.cpu.vcores", 1);
        return Resource.newInstance((int)memory, (int)cpus);
    }

    private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
        HashMap<String, String> environment = new HashMap<String, String>();
        MRHelpers.updateEnvBasedOnMRTaskEnv((Configuration)conf, environment, (boolean)isMap);
        return environment;
    }

    private static String getContainerJavaOpts(Configuration conf) {
        String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
        String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
        ArrayList<String> logProps = Lists.newArrayList();
        TezUtils.addLog4jSystemProperties((String)logLevel, logProps);
        StringBuilder sb = new StringBuilder();
        for (String str : logProps) {
            sb.append(str).append(" ");
        }
        logLevel = sb.toString();
        if (HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0) {
            if (javaOpts != null) {
                return javaOpts + " " + logLevel;
            }
            return logLevel;
        }
        if (javaOpts != null && !javaOpts.isEmpty()) {
            LOG.warn((Object)((Object)HiveConf.ConfVars.HIVETEZJAVAOPTS) + " will be ignored because " + (Object)((Object)HiveConf.ConfVars.HIVETEZCONTAINERSIZE) + " is not set!");
        }
        return logLevel + " " + MRHelpers.getJavaOptsForMRMapper((Configuration)conf);
    }

    private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, FileSystem fs, Path mrScratchDir, Context ctx, TezWork.VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
        Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
        if (mergeJoinWork.getMainWork() instanceof MapWork) {
            List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
            MapWork mapWork = (MapWork)mergeJoinWork.getMainWork();
            Vertex mergeVx = this.createVertex(conf, mapWork, fs, mrScratchDir, ctx, vertexType, localResources);
            conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
            conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
            for (int i = 0; i < mapWorkList.size(); ++i) {
                mapWork = (MapWork)mapWorkList.get(i);
                conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName());
                conf.set("iocontext.input.name", mapWork.getName());
                LOG.info("Going through each work and adding MultiMRInput");
                mergeVx.addDataSource(mapWork.getName(), MultiMRInput.createConfigBuilder((Configuration)conf, HiveInputFormat.class).build());
            }
            HashMap<String, Integer> inputToBucketMap = new HashMap<String, Integer>();
            if (mergeJoinWork.getMergeJoinOperator().getParentOperators().size() == 1 && mergeJoinWork.getMergeJoinOperator().getOpTraits() != null) {
                for (BaseWork work : mapWorkList) {
                    MapWork mw = (MapWork)work;
                    LinkedHashMap<String, Operator<? extends OperatorDesc>> aliasToWork = mw.getAliasToWork();
                    Preconditions.checkState(aliasToWork.size() == 1, "More than 1 alias in SMB mapwork");
                    inputToBucketMap.put(mw.getName(), mw.getWorks().get(0).getOpTraits().getNumBuckets());
                }
            }
            VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)CustomPartitionVertex.class.getName());
            CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(((CommonMergeJoinDesc)mergeJoinWork.getMergeJoinOperator().getConf()).getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(), mapWorkList.size() + 1, inputToBucketMap);
            DataOutputBuffer dob = new DataOutputBuffer();
            vertexConf.write((DataOutput)dob);
            byte[] userPayload = dob.getData();
            desc.setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(userPayload)));
            mergeVx.setVertexManagerPlugin(desc);
            return mergeVx;
        }
        return this.createVertex(conf, (ReduceWork)mergeJoinWork.getMainWork(), fs, mrScratchDir, ctx, localResources);
    }

    private Vertex createVertex(JobConf conf, MapWork mapWork, FileSystem fs, Path mrScratchDir, Context ctx, TezWork.VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
        DataSourceDescriptor dataSource;
        boolean groupSplitsInInputInitializer;
        Utilities.cacheMapWork((Configuration)conf, mapWork, mrScratchDir);
        Utilities.createTmpDirs((Configuration)conf, mapWork);
        Vertex map = null;
        int numTasks = -1;
        Class<HiveInputFormat> inputFormatClass = conf.getClass("mapred.input.format.class", InputFormat.class);
        boolean vertexHasCustomInput = TezWork.VertexType.isCustomInputType(vertexType);
        LOG.info("Vertex has custom input? " + vertexHasCustomInput);
        if (vertexHasCustomInput) {
            groupSplitsInInputInitializer = false;
            inputFormatClass = HiveInputFormat.class;
            conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
            conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
        } else {
            groupSplitsInInputInitializer = inputFormatClass == HiveInputFormat.class;
        }
        if (mapWork instanceof MergeFileWork) {
            Path outputPath = ((MergeFileWork)mapWork).getOutputDir();
            Path tempOutPath = Utilities.toTempPath(outputPath);
            try {
                FileSystem tmpOutFS = tempOutPath.getFileSystem((Configuration)conf);
                if (!tmpOutFS.exists(tempOutPath)) {
                    tmpOutFS.mkdirs(tempOutPath);
                }
            }
            catch (IOException e) {
                throw new RuntimeException("Can't make path " + outputPath + " : " + e.getMessage(), e);
            }
        }
        conf.set("iocontext.input.name", mapWork.getName());
        if (HiveConf.getBoolVar((Configuration)conf, HiveConf.ConfVars.HIVE_AM_SPLIT_GENERATION)) {
            Utilities.setMapWork((Configuration)conf, mapWork, mrScratchDir, false);
            if (groupSplitsInInputInitializer) {
                InputInitializerDescriptor descriptor = InputInitializerDescriptor.create((String)HiveSplitGenerator.class.getName());
                dataSource = MRInputLegacy.createConfigBuilder((Configuration)conf, inputFormatClass).groupSplits(true).setCustomInitializerDescriptor(descriptor).build();
            } else {
                dataSource = vertexHasCustomInput && vertexType == TezWork.VertexType.MULTI_INPUT_UNINITIALIZED_EDGES ? MultiMRInput.createConfigBuilder((Configuration)conf, inputFormatClass).groupSplits(false).build() : MRInputLegacy.createConfigBuilder((Configuration)conf, inputFormatClass).groupSplits(false).build();
            }
        } else {
            conf.setBoolean("VECTOR_MODE", mapWork.getVectorMode());
            conf.setBoolean("USE_VECTORIZED_INPUT_FILE_FORMAT", mapWork.getUseVectorizedInputFileFormat());
            InputSplitInfoMem inputSplitInfo = MRInputHelpers.generateInputSplitsToMem((Configuration)conf, (boolean)false, (int)0);
            InputInitializerDescriptor descriptor = InputInitializerDescriptor.create((String)MRInputSplitDistributor.class.getName());
            InputDescriptor inputDescriptor = (InputDescriptor)InputDescriptor.create((String)MRInputLegacy.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)MRRuntimeProtos.MRInputUserPayloadProto.newBuilder().setConfigurationBytes(TezUtils.createByteStringFromConf((Configuration)conf)).setSplits(inputSplitInfo.getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()));
            dataSource = DataSourceDescriptor.create((InputDescriptor)inputDescriptor, (InputInitializerDescriptor)descriptor, null);
            numTasks = inputSplitInfo.getNumTasks();
            Utilities.setMapWork((Configuration)conf, mapWork, mrScratchDir, false);
        }
        UserPayload serializedConf = TezUtils.createUserPayloadFromConf((Configuration)conf);
        String procClassName = MapTezProcessor.class.getName();
        if (mapWork instanceof MergeFileWork) {
            procClassName = MergeFileTezProcessor.class.getName();
        }
        Vertex.VertexExecutionContext executionContext = this.createVertexExecutionContext(mapWork);
        map = Vertex.create((String)mapWork.getName(), (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)procClassName).setUserPayload(serializedConf)), (int)numTasks, (Resource)DagUtils.getContainerResource((Configuration)conf));
        map.setTaskEnvironment(this.getContainerEnvironment((Configuration)conf, true));
        map.setExecutionContext(executionContext);
        map.setTaskLaunchCmdOpts(DagUtils.getContainerJavaOpts((Configuration)conf));
        assert (mapWork.getAliasToWork().keySet().size() == 1);
        String alias = mapWork.getAliasToWork().keySet().iterator().next();
        map.addDataSource(alias, dataSource);
        map.addTaskLocalFiles(localResources);
        return map;
    }

    private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
        JobConf conf = new JobConf((Configuration)baseConf);
        conf.set("__hive.context.name", reduceWork.getName());
        conf.set("mapred.reducer.class", ExecReducer.class.getName());
        boolean useSpeculativeExecReducers = HiveConf.getBoolVar((Configuration)conf, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
        conf.setBoolean("mapreduce.reduce.speculative", useSpeculativeExecReducers);
        return conf;
    }

    private Vertex.VertexExecutionContext createVertexExecutionContext(BaseWork work) {
        Vertex.VertexExecutionContext vertexExecutionContext = Vertex.VertexExecutionContext.createExecuteInContainers((boolean)true);
        if (work.getLlapMode()) {
            vertexExecutionContext = Vertex.VertexExecutionContext.create((String)"LLAP", (String)"LLAP", (String)"LLAP");
        }
        if (work.getUberMode()) {
            vertexExecutionContext = Vertex.VertexExecutionContext.createExecuteInAm((boolean)true);
        }
        return vertexExecutionContext;
    }

    private Vertex createVertex(JobConf conf, ReduceWork reduceWork, FileSystem fs, Path mrScratchDir, Context ctx, Map<String, LocalResource> localResources) throws Exception {
        conf.set("iocontext.input.name", reduceWork.getName());
        Utilities.setReduceWork((Configuration)conf, reduceWork, mrScratchDir, false);
        Utilities.createTmpDirs((Configuration)conf, reduceWork);
        Vertex.VertexExecutionContext vertexExecutionContext = this.createVertexExecutionContext(reduceWork);
        Vertex reducer = Vertex.create((String)reduceWork.getName(), (ProcessorDescriptor)((ProcessorDescriptor)ProcessorDescriptor.create((String)ReduceTezProcessor.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)conf))), (int)(reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork.getNumReduceTasks().intValue()), (Resource)DagUtils.getContainerResource((Configuration)conf));
        reducer.setTaskEnvironment(this.getContainerEnvironment((Configuration)conf, false));
        reducer.setExecutionContext(vertexExecutionContext);
        reducer.setTaskLaunchCmdOpts(DagUtils.getContainerJavaOpts((Configuration)conf));
        reducer.addTaskLocalFiles(localResources);
        return reducer;
    }

    public static Map<String, LocalResource> createTezLrMap(LocalResource appJarLr, Collection<LocalResource> additionalLr) {
        HashMap<String, LocalResource> localResources = new HashMap<String, LocalResource>();
        if (appJarLr != null) {
            localResources.put(DagUtils.getBaseName(appJarLr), appJarLr);
        }
        if (additionalLr != null) {
            for (LocalResource lr : additionalLr) {
                localResources.put(DagUtils.getBaseName(lr), lr);
            }
        }
        return localResources;
    }

    private LocalResource createLocalResource(FileSystem remoteFs, Path file, LocalResourceType type, LocalResourceVisibility visibility) {
        FileStatus fstat = null;
        try {
            fstat = remoteFs.getFileStatus(file);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        URL resourceURL = ConverterUtils.getYarnUrlFromPath((Path)file);
        long resourceSize = fstat.getLen();
        long resourceModificationTime = fstat.getModificationTime();
        LOG.info("Resource modification time: " + resourceModificationTime + " for " + file);
        LocalResource lr = (LocalResource)Records.newRecord(LocalResource.class);
        lr.setResource(resourceURL);
        lr.setType(type);
        lr.setSize(resourceSize);
        lr.setVisibility(visibility);
        lr.setTimestamp(resourceModificationTime);
        return lr;
    }

    public PreWarmVertex createPreWarmVertex(TezConfiguration conf, int numContainers, Map<String, LocalResource> localResources) throws IOException, TezException {
        ProcessorDescriptor prewarmProcDescriptor = ProcessorDescriptor.create((String)HivePreWarmProcessor.class.getName());
        prewarmProcDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)conf));
        PreWarmVertex prewarmVertex = PreWarmVertex.create((String)"prewarm", (ProcessorDescriptor)prewarmProcDescriptor, (int)numContainers, (Resource)DagUtils.getContainerResource((Configuration)conf));
        HashMap<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
        if (localResources != null) {
            combinedResources.putAll(localResources);
        }
        prewarmVertex.addTaskLocalFiles(localResources);
        prewarmVertex.setTaskLaunchCmdOpts(DagUtils.getContainerJavaOpts((Configuration)conf));
        prewarmVertex.setTaskEnvironment(this.getContainerEnvironment((Configuration)conf, false));
        return prewarmVertex;
    }

    public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
        String jarPathStr;
        UserGroupInformation ugi = Utils.getUGI();
        String userName = ugi.getShortUserName();
        String userPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_USER_INSTALL_DIR);
        Path userPath = new Path(userPathStr);
        FileSystem fs = userPath.getFileSystem(conf);
        String hdfsDirPathStr = jarPathStr = userPathStr + "/" + userName;
        Path hdfsDirPath = new Path(hdfsDirPathStr);
        try {
            FileStatus fstatus = fs.getFileStatus(hdfsDirPath);
            if (!fstatus.isDir()) {
                throw new IOException(ErrorMsg.INVALID_DIR.format(hdfsDirPath.toString()));
            }
        }
        catch (FileNotFoundException e) {
            fs.mkdirs(hdfsDirPath);
        }
        Path retPath = new Path(hdfsDirPath.toString() + "/.hiveJars");
        fs.mkdirs(retPath);
        return retPath;
    }

    public List<LocalResource> localizeTempFilesFromConf(String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
        ArrayList<LocalResource> tmpResources = new ArrayList<LocalResource>();
        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEADDFILESUSEHDFSLOCATION)) {
            this.addHdfsResource(conf, tmpResources, LocalResourceType.FILE, DagUtils.getHdfsTempFilesFromConf(conf));
            this.addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, DagUtils.getLocalTempFilesFromConf(conf), null);
        } else {
            this.addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, DagUtils.getTempFilesFromConf(conf), null);
        }
        this.addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.ARCHIVE, DagUtils.getTempArchivesFromConf(conf), null);
        return tmpResources;
    }

    private void addHdfsResource(Configuration conf, List<LocalResource> tmpResources, LocalResourceType type, String[] files) throws IOException {
        for (String file : files) {
            if (!StringUtils.isNotBlank(file)) continue;
            Path dest = new Path(file);
            FileSystem destFS = dest.getFileSystem(conf);
            LocalResource localResource = this.createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
            tmpResources.add(localResource);
        }
    }

    private static String[] getHdfsTempFilesFromConf(Configuration conf) {
        String addedFiles = Utilities.getHdfsResourceFiles(conf, SessionState.ResourceType.FILE);
        String addedJars = Utilities.getHdfsResourceFiles(conf, SessionState.ResourceType.JAR);
        String allFiles = addedJars + "," + addedFiles;
        return allFiles.split(",");
    }

    private static String[] getLocalTempFilesFromConf(Configuration conf) {
        String addedFiles = Utilities.getLocalResourceFiles(conf, SessionState.ResourceType.FILE);
        String addedJars = Utilities.getLocalResourceFiles(conf, SessionState.ResourceType.JAR);
        String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
        String allFiles = auxJars + "," + addedJars + "," + addedFiles;
        return allFiles.split(",");
    }

    public static String[] getTempFilesFromConf(Configuration conf) {
        String addedJars;
        if (conf == null) {
            return new String[0];
        }
        String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
        if (StringUtils.isNotBlank(addedFiles)) {
            HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDFILES, addedFiles);
        }
        if (StringUtils.isNotBlank(addedJars = Utilities.getResourceFiles(conf, SessionState.ResourceType.JAR))) {
            HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDJARS, addedJars);
        }
        String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
        String allFiles = auxJars + "," + addedJars + "," + addedFiles;
        return allFiles.split(",");
    }

    private static String[] getTempArchivesFromConf(Configuration conf) {
        String addedArchives = Utilities.getResourceFiles(conf, SessionState.ResourceType.ARCHIVE);
        if (StringUtils.isNotBlank(addedArchives)) {
            HiveConf.setVar(conf, HiveConf.ConfVars.HIVEADDEDARCHIVES, addedArchives);
            return addedArchives.split(",");
        }
        return new String[0];
    }

    public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration conf, String[] inputOutputJars, String[] skipJars) throws IOException, LoginException {
        if (inputOutputJars == null) {
            return null;
        }
        ArrayList<LocalResource> tmpResources = new ArrayList<LocalResource>();
        this.addTempResources(conf, tmpResources, hdfsDirPathStr, LocalResourceType.FILE, inputOutputJars, skipJars);
        return tmpResources;
    }

    private void addTempResources(Configuration conf, List<LocalResource> tmpResources, String hdfsDirPathStr, LocalResourceType type, String[] files, String[] skipFiles) throws IOException {
        HashSet<Path> skipFileSet = null;
        if (skipFiles != null) {
            skipFileSet = new HashSet<Path>();
            for (String skipFile : skipFiles) {
                if (StringUtils.isBlank(skipFile)) continue;
                skipFileSet.add(new Path(skipFile));
            }
        }
        for (String file : files) {
            if (!StringUtils.isNotBlank(file)) continue;
            if (skipFileSet != null && skipFileSet.contains(new Path(file))) {
                LOG.info("Skipping vertex resource " + file + " that already exists in the session");
                continue;
            }
            Path hdfsFilePath = new Path(hdfsDirPathStr, this.getResourceBaseName(new Path(file)));
            LocalResource localResource = this.localizeResource(new Path(file), hdfsFilePath, type, conf);
            tmpResources.add(localResource);
        }
    }

    public FileStatus getHiveJarDirectory(Configuration conf) throws IOException, LoginException {
        FileStatus fstatus = null;
        String hdfsDirPathStr = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_JAR_DIRECTORY, (String)null);
        if (hdfsDirPathStr != null) {
            LOG.info("Hive jar directory is " + hdfsDirPathStr);
            fstatus = DagUtils.validateTargetDir(new Path(hdfsDirPathStr), conf);
        }
        if (fstatus == null) {
            Path destDir = this.getDefaultDestDir(conf);
            LOG.info("Jar dir is null / directory doesn't exist. Choosing HIVE_INSTALL_DIR - " + destDir);
            fstatus = DagUtils.validateTargetDir(destDir, conf);
        }
        if (fstatus == null) {
            throw new IOException(ErrorMsg.NO_VALID_LOCATIONS.getMsg());
        }
        return fstatus;
    }

    public static FileStatus validateTargetDir(Path path, Configuration conf) throws IOException {
        FileSystem fs = path.getFileSystem(conf);
        FileStatus fstatus = null;
        try {
            fstatus = fs.getFileStatus(path);
        }
        catch (FileNotFoundException fileNotFoundException) {
            // empty catch block
        }
        return fstatus != null && fstatus.isDir() ? fstatus : null;
    }

    public String getExecJarPathLocal(Configuration configuration) throws URISyntaxException {
        URI uri = DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI();
        if (configuration.getBoolean(HiveConf.ConfVars.HIVE_IN_TEST_IDE.varname, false) && new File(uri.getPath()).isDirectory()) {
            uri = this.createEmptyArchive();
        }
        return uri.toString();
    }

    private URI createEmptyArchive() {
        try {
            File outputJar = new File(System.getProperty("build.test.dir"), "empty.jar");
            ZipOutputStream zos = new ZipOutputStream(new FileOutputStream(outputJar));
            zos.close();
            return outputJar.toURI();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static String getBaseName(LocalResource lr) {
        return .FilenameUtils.getName((String)lr.getResource().getFile());
    }

    public String getResourceBaseName(Path path) {
        return path.getName();
    }

    private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf) throws IOException {
        FileSystem destFS = dest.getFileSystem(conf);
        FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
        if (destStatus != null) {
            return sourceFS.getFileStatus(src).getLen() == destStatus.getLen();
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public LocalResource localizeResource(Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException {
        FileSystem destFS = dest.getFileSystem(conf);
        LocalFileSystem srcFs = FileSystem.getLocal((Configuration)conf);
        if (src != null && !this.checkPreExisting((FileSystem)srcFs, src, dest, conf)) {
            Object notifier;
            String srcStr = src.toString();
            LOG.info("Localizing resource because it does not exist: " + srcStr + " to dest: " + dest);
            Object notifierNew = new Object();
            Object notifierOld = this.copyNotifiers.putIfAbsent(srcStr, notifierNew);
            Object object = notifier = notifierOld == null ? notifierNew : notifierOld;
            if (notifierOld != null && this.checkOrWaitForTheFile((FileSystem)srcFs, src, dest, conf, notifierOld, 1, 150L, false)) {
                return this.createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
            }
            try {
                destFS.copyFromLocalFile(false, false, src, dest);
                Object object2 = notifier;
                synchronized (object2) {
                    notifier.notifyAll();
                }
                this.copyNotifiers.remove(srcStr, notifier);
            }
            catch (IOException e) {
                if ("Exception while contacting value generator".equals(e.getMessage())) {
                    throw new IOException("copyFromLocalFile failed due to HDFS KMS failure", e);
                }
                LOG.info("Looks like another thread or process is writing the same file");
                int waitAttempts = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS);
                long sleepInterval = HiveConf.getTimeVar(conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS);
                if (!this.checkOrWaitForTheFile((FileSystem)srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
                    LOG.error("Could not find the jar that was being uploaded");
                    throw new IOException("Previous writer likely failed to write " + dest + ". Failing because I am unlikely to write too.");
                }
            }
            finally {
                if (notifier == notifierNew) {
                    this.copyNotifiers.remove(srcStr, notifierNew);
                }
            }
        }
        return this.createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf, Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
        for (int i = 0; i < waitAttempts; ++i) {
            if (this.checkPreExisting(srcFs, src, dest, conf)) {
                return true;
            }
            if (doLog && i == 0) {
                LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with " + sleepInterval + "ms interval)");
            }
            try {
                if (notifier != null) {
                    Object object = notifier;
                    synchronized (object) {
                        notifier.wait(sleepInterval);
                        continue;
                    }
                }
                Thread.sleep(sleepInterval);
                continue;
            }
            catch (InterruptedException interruptedException) {
                throw new IOException(interruptedException);
            }
        }
        return this.checkPreExisting(srcFs, src, dest, conf);
    }

    public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
        hiveConf.setBoolean("mapred.mapper.new-api", false);
        JobConf conf = new JobConf((Configuration)new TezConfiguration((Configuration)hiveConf));
        conf.set("mapred.output.committer.class", HiveFileFormatUtils.NullOutputCommitter.class.getName());
        conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
        conf.setBoolean("mapred.committer.job.task.cleanup.needed", false);
        conf.setClass("mapred.output.format.class", HiveOutputFormatImpl.class, OutputFormat.class);
        conf.set("mapreduce.job.output.key.class", HiveKey.class.getName());
        conf.set("mapreduce.job.output.value.class", BytesWritable.class.getName());
        conf.set("mapred.partitioner.class", HiveConf.getVar((Configuration)conf, HiveConf.ConfVars.HIVEPARTITIONER));
        conf.set("tez.runtime.partitioner.class", MRPartitioner.class.getName());
        conf.unset("mapreduce.job.credentials.binary");
        hiveConf.stripHiddenConfigurations((Configuration)conf);
        return conf;
    }

    public JobConf initializeVertexConf(JobConf conf, Context context, BaseWork work) {
        if (work instanceof MapWork) {
            return this.initializeVertexConf(conf, context, (MapWork)work);
        }
        if (work instanceof ReduceWork) {
            return this.initializeVertexConf(conf, context, (ReduceWork)work);
        }
        if (work instanceof MergeJoinWork) {
            return this.initializeVertexConf(conf, context, (MergeJoinWork)work);
        }
        assert (false);
        return null;
    }

    private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) {
        if (work.getMainWork() instanceof MapWork) {
            return this.initializeVertexConf(conf, context, (MapWork)work.getMainWork());
        }
        return this.initializeVertexConf(conf, context, (ReduceWork)work.getMainWork());
    }

    public Vertex createVertex(JobConf conf, BaseWork work, Path scratchDir, FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork, TezWork.VertexType vertexType, Map<String, LocalResource> localResources) throws Exception {
        StatsFactory factory;
        Vertex v = null;
        if (work instanceof MapWork) {
            v = this.createVertex(conf, (MapWork)work, fileSystem, scratchDir, ctx, vertexType, localResources);
        } else if (work instanceof ReduceWork) {
            v = this.createVertex(conf, (ReduceWork)work, fileSystem, scratchDir, ctx, localResources);
        } else if (work instanceof MergeJoinWork) {
            v = this.createVertex(conf, (MergeJoinWork)work, fileSystem, scratchDir, ctx, vertexType, localResources);
            ArrayList<String> crossProductSources = new ArrayList<String>();
            for (BaseWork parentWork : tezWork.getParents(work)) {
                if (tezWork.getEdgeType(parentWork, work) != TezEdgeProperty.EdgeType.XPROD_EDGE) continue;
                crossProductSources.add(parentWork.getName());
            }
            if (!crossProductSources.isEmpty()) {
                CartesianProductConfig cpConfig = new CartesianProductConfig(crossProductSources);
                v.setVertexManagerPlugin((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)CartesianProductVertexManager.class.getName()).setUserPayload(cpConfig.toUserPayload(new TezConfiguration((Configuration)conf))));
            }
        } else {
            throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
        }
        if (work.isGatheringStats() && (factory = StatsFactory.newFactory((Configuration)conf)) != null) {
            StatsCollectionContext sCntxt = new StatsCollectionContext((Configuration)conf);
            sCntxt.setStatsTmpDirs(Utilities.getStatsTmpDirs(work, (Configuration)conf));
            StatsPublisher statsPublisher = factory.getStatsPublisher();
            if (!statsPublisher.init(sCntxt) && HiveConf.getBoolVar((Configuration)conf, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
                throw new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
            }
        }
        if (!hasChildren) {
            v.addDataSink("out_" + work.getName(), new DataSinkDescriptor((OutputDescriptor)OutputDescriptor.create((String)MROutput.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf((Configuration)conf)), null, null));
        }
        return v;
    }

    public void addCredentials(BaseWork work, DAG dag) throws IOException {
        dag.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
        if (work instanceof MapWork) {
            this.addCredentials((MapWork)work, dag);
        } else if (work instanceof ReduceWork) {
            this.addCredentials((ReduceWork)work, dag);
        }
    }

    public Path createTezDir(Path scratchDir, Configuration conf) throws IOException {
        String userName = System.getProperty("user.name");
        try {
            UserGroupInformation ugi = Utils.getUGI();
            userName = ugi.getShortUserName();
        }
        catch (LoginException e) {
            throw new IOException(e);
        }
        scratchDir = new Path(scratchDir, userName);
        Path tezDir = this.getTezDir(scratchDir);
        FileSystem fs = tezDir.getFileSystem(conf);
        LOG.debug("TezDir path set " + tezDir + " for user: " + userName);
        fs.mkdirs(tezDir);
        return tezDir;
    }

    public Path getTezDir(Path scratchDir) {
        return new Path(scratchDir, TEZ_DIR);
    }

    public static DagUtils getInstance() {
        return instance;
    }

    private void setupAutoReducerParallelism(TezEdgeProperty edgeProp, Vertex v) throws IOException {
        if (edgeProp.isAutoReduce()) {
            Configuration pluginConf = new Configuration(false);
            VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)ShuffleVertexManager.class.getName());
            pluginConf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
            pluginConf.setInt("tez.shuffle-vertex-manager.min-task-parallelism", edgeProp.getMinReducer());
            pluginConf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", edgeProp.getInputSizePerReducer());
            UserPayload payload = TezUtils.createUserPayloadFromConf((Configuration)pluginConf);
            desc.setUserPayload(payload);
            v.setVertexManagerPlugin(desc);
        }
    }

    private void setupQuickStart(TezEdgeProperty edgeProp, Vertex v) throws IOException {
        if (!edgeProp.isSlowStart()) {
            Configuration pluginConf = new Configuration(false);
            VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create((String)ShuffleVertexManager.class.getName());
            pluginConf.setFloat("tez.shuffle-vertex-manager.min-src-fraction", 0.0f);
            pluginConf.setFloat("tez.shuffle-vertex-manager.max-src-fraction", 0.0f);
            UserPayload payload = TezUtils.createUserPayloadFromConf((Configuration)pluginConf);
            desc.setUserPayload(payload);
            v.setVertexManagerPlugin(desc);
        }
    }

    public String createDagName(Configuration conf, QueryPlan plan) {
        String name = DagUtils.getUserSpecifiedDagName(conf);
        if (name == null) {
            name = plan.getQueryId();
        }
        assert (name != null);
        return name;
    }

    public static String getUserSpecifiedDagName(Configuration conf) {
        String name = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYNAME);
        return name != null ? name : conf.get("mapred.job.name");
    }

    private DagUtils() {
    }

    static double adjustMemoryReserveFraction(long memoryRequested, HiveConf conf) {
        long actualMemToBeAllocated;
        if (conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION) > 0.0f) {
            return conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION);
        }
        float tezHeapFraction = conf.getFloatVar(HiveConf.ConfVars.TEZ_CONTAINER_MAX_JAVA_HEAP_FRACTION);
        float tezMinReserveFraction = conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MIN);
        float tezMaxReserveFraction = conf.getFloatVar(HiveConf.ConfVars.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION_MAX);
        Resource resource = DagUtils.getContainerResource(conf);
        long containerSize = (long)resource.getMemory() * 1024L * 1024L;
        String javaOpts = DagUtils.getContainerJavaOpts(conf);
        long xmx = DagUtils.parseRightmostXmx(javaOpts);
        if (xmx <= 0L) {
            xmx = (long)(tezHeapFraction * (float)containerSize);
        }
        if ((actualMemToBeAllocated = (long)(tezMinReserveFraction * (float)xmx)) < memoryRequested) {
            LOG.warn("The actual amount of memory to be allocated " + actualMemToBeAllocated + " is less than the amount of requested memory for Map Join conversion " + memoryRequested);
            float frac = (float)memoryRequested / (float)xmx;
            LOG.info("Fraction after calculation: " + frac);
            if (frac <= tezMinReserveFraction) {
                return tezMinReserveFraction;
            }
            if (frac > tezMinReserveFraction && frac < tezMaxReserveFraction) {
                LOG.info("Will adjust Tez setting tez.task.scale.memory.reserve-fraction to " + frac + " to allocate more memory");
                return frac;
            }
            return tezMaxReserveFraction;
        }
        return tezMinReserveFraction;
    }

    static long parseRightmostXmx(String javaOpts) {
        Pattern JAVA_OPTS_XMX_PATTERN = Pattern.compile(".*(?:^|\\s)-Xmx(\\d+)([gGmMkK]?)(?:$|\\s).*");
        Matcher m = JAVA_OPTS_XMX_PATTERN.matcher(javaOpts);
        if (m.matches()) {
            long size = Long.parseLong(m.group(1));
            if (size <= 0L) {
                return -1L;
            }
            if (m.group(2).isEmpty()) {
                return size;
            }
            char unit = m.group(2).charAt(0);
            switch (unit) {
                case 'K': 
                case 'k': {
                    return size * 1024L;
                }
                case 'M': 
                case 'm': {
                    return size * 1024L * 1024L;
                }
                case 'G': 
                case 'g': {
                    return size * 1024L * 1024L * 1024L;
                }
            }
        }
        return -1L;
    }

    public static Map<String, LocalResource> getResourcesUpdatableForAm(Collection<LocalResource> allNonAppResources) {
        HashMap<String, LocalResource> allNonAppFileResources = new HashMap<String, LocalResource>();
        if (allNonAppResources == null) {
            return allNonAppFileResources;
        }
        for (LocalResource lr : allNonAppResources) {
            if (lr.getType() != LocalResourceType.FILE) continue;
            allNonAppFileResources.put(DagUtils.getBaseName(lr), lr);
        }
        return allNonAppFileResources;
    }

    public static class ValueHashPartitioner
    implements Partitioner {
        public int getPartition(Object key, Object value, int numPartitions) {
            return (value.hashCode() & Integer.MAX_VALUE) % numPartitions;
        }
    }
}

