package alluxio.job.plan.replicate;

import alluxio.AlluxioURI;
import alluxio.client.block.BlockStoreClient;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.collections.Pair;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.status.NotFoundException;
import alluxio.grpc.RemoveBlockRequest;
import alluxio.job.JobConfig;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.resource.CloseableResource;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.BlockInfo;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/job/plan/replicate/SetReplicaDefinition.class */
public final class SetReplicaDefinition extends AbstractVoidPlanDefinition<SetReplicaConfig, SetReplicaTask> {
    private static final Logger LOG = LoggerFactory.getLogger(SetReplicaDefinition.class);

    @Override // alluxio.job.plan.PlanDefinition
    public Class<SetReplicaConfig> getJobConfigClass() {
        return SetReplicaConfig.class;
    }

    public Set<Pair<WorkerInfo, SetReplicaTask>> selectExecutors(SetReplicaConfig setReplicaConfig, List<WorkerInfo> list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        Mode mode;
        Preconditions.checkArgument(!list.isEmpty(), "No worker is available");
        long blockId = setReplicaConfig.getBlockId();
        int replicas = setReplicaConfig.getReplicas();
        Preconditions.checkArgument(replicas >= 0);
        BlockInfo info = BlockStoreClient.create(selectExecutorsContext.getFsContext()).getInfo(blockId);
        int size = info.getLocations().size();
        HashSet newHashSet = Sets.newHashSet();
        if (replicas == size) {
            LOG.warn("Evict target has already been satisfied for job:{}", setReplicaConfig);
            return newHashSet;
        }
        int i = size - replicas;
        if (i > 0) {
            mode = Mode.EVICT;
        } else {
            i = Math.abs(i);
            mode = Mode.REPLICATE;
        }
        Set set = (Set) info.getLocations().stream().map((v0) -> {
            return v0.getWorkerAddress();
        }).map((v0) -> {
            return v0.getHost();
        }).collect(Collectors.toSet());
        Collections.shuffle(list);
        for (WorkerInfo workerInfo : list) {
            boolean contains = set.contains(workerInfo.getAddress().getHost());
            if (mode == Mode.REPLICATE) {
                contains = !contains;
            }
            if (contains) {
                newHashSet.add(new Pair(workerInfo, new SetReplicaTask(mode)));
                if (newHashSet.size() >= i) {
                    break;
                }
            }
        }
        return newHashSet;
    }

    @Override // alluxio.job.plan.PlanDefinition
    public SerializableVoid runTask(SetReplicaConfig setReplicaConfig, SetReplicaTask setReplicaTask, RunTaskContext runTaskContext) throws Exception {
        switch (setReplicaTask.getMode()) {
            case EVICT:
                evict(setReplicaConfig, runTaskContext);
                return null;
            case REPLICATE:
                replicate(setReplicaConfig, runTaskContext);
                return null;
            default:
                throw new IllegalArgumentException(String.format("Unexpected replication mode {}.", setReplicaTask.getMode()));
        }
    }

    private void evict(SetReplicaConfig setReplicaConfig, RunTaskContext runTaskContext) throws Exception {
        long blockId = setReplicaConfig.getBlockId();
        String connectHost = NetworkAddressUtils.getConnectHost(NetworkAddressUtils.ServiceType.WORKER_RPC, ServerConfiguration.global());
        WorkerNetAddress workerNetAddress = null;
        Iterator it = runTaskContext.getFsContext().getCachedWorkers().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            BlockWorkerInfo blockWorkerInfo = (BlockWorkerInfo) it.next();
            if (blockWorkerInfo.getNetAddress().getHost().equals(connectHost)) {
                workerNetAddress = blockWorkerInfo.getNetAddress();
                break;
            }
        }
        if (workerNetAddress == null) {
            throw new NotFoundException(String.format("Cannot find a local block worker to evict block %d", Long.valueOf(blockId)));
        }
        RemoveBlockRequest build = RemoveBlockRequest.newBuilder().setBlockId(blockId).build();
        try {
            CloseableResource acquireBlockWorkerClient = runTaskContext.getFsContext().acquireBlockWorkerClient(workerNetAddress);
            Throwable th = null;
            try {
                ((BlockWorkerClient) acquireBlockWorkerClient.get()).removeBlock(build);
                if (acquireBlockWorkerClient != null) {
                    if (0 != 0) {
                        try {
                            acquireBlockWorkerClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        acquireBlockWorkerClient.close();
                    }
                }
            } finally {
            }
        } catch (NotFoundException e) {
            LOG.warn("Failed to delete block {} on {}: block does not exist", Long.valueOf(blockId), workerNetAddress);
        }
    }

    private void replicate(SetReplicaConfig setReplicaConfig, RunTaskContext runTaskContext) throws Exception {
        JobUtils.loadBlock(runTaskContext.getFileSystem().getStatus(new AlluxioURI(setReplicaConfig.getPath())), runTaskContext.getFsContext(), setReplicaConfig.getBlockId(), null, false);
        LOG.info("Replicated file " + setReplicaConfig.getPath() + " block " + setReplicaConfig.getBlockId());
    }

    @Override // alluxio.job.plan.PlanDefinition
    public /* bridge */ /* synthetic */ Set selectExecutors(JobConfig jobConfig, List list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        return selectExecutors((SetReplicaConfig) jobConfig, (List<WorkerInfo>) list, selectExecutorsContext);
    }
}
