package alluxio.client.block;

import alluxio.client.WriteType;
import alluxio.client.block.policy.BlockLocationPolicy;
import alluxio.client.block.policy.options.GetWorkerOptions;
import alluxio.client.block.stream.BlockInStream;
import alluxio.client.block.stream.BlockOutStream;
import alluxio.client.block.stream.DataWriter;
import alluxio.client.block.util.BlockLocationUtils;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.URIStatus;
import alluxio.client.file.options.InStreamOptions;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.collections.Pair;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.PreconditionMessage;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.exception.status.UnavailableException;
import alluxio.network.TieredIdentityFactory;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.collect.ImmutableMap;
import alluxio.shaded.client.com.google.common.collect.Lists;
import alluxio.shaded.client.com.google.common.collect.Sets;
import alluxio.shaded.client.javax.annotation.concurrent.ThreadSafe;
import alluxio.wire.BlockInfo;
import alluxio.wire.BlockLocation;
import alluxio.wire.TieredIdentity;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/client/block/BlockStoreClient.class */
public final class BlockStoreClient {
    private static final Logger LOG = LoggerFactory.getLogger(BlockStoreClient.class);
    private final FileSystemContext mContext;
    private final TieredIdentity mTieredIdentity;

    public static BlockStoreClient create(FileSystemContext fileSystemContext) {
        return new BlockStoreClient(fileSystemContext, TieredIdentityFactory.localIdentity(fileSystemContext.getClusterConf()));
    }

    @VisibleForTesting
    BlockStoreClient(FileSystemContext fileSystemContext, TieredIdentity tieredIdentity) {
        this.mContext = fileSystemContext;
        this.mTieredIdentity = tieredIdentity;
    }

    public BlockInfo getInfo(long j) throws IOException {
        CloseableResource<BlockMasterClient> acquireBlockMasterClientResource = this.mContext.acquireBlockMasterClientResource();
        Throwable th = null;
        try {
            try {
                BlockInfo blockInfo = acquireBlockMasterClientResource.get().getBlockInfo(j);
                if (acquireBlockMasterClientResource != null) {
                    if (0 != 0) {
                        try {
                            acquireBlockMasterClientResource.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        acquireBlockMasterClientResource.close();
                    }
                }
                return blockInfo;
            } finally {
            }
        } catch (Throwable th3) {
            if (acquireBlockMasterClientResource != null) {
                if (th != null) {
                    try {
                        acquireBlockMasterClientResource.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    acquireBlockMasterClientResource.close();
                }
            }
            throw th3;
        }
    }

    public BlockInStream getInStream(long j, InStreamOptions inStreamOptions) throws IOException {
        return getInStream(j, inStreamOptions, ImmutableMap.of());
    }

    public BlockInStream getInStream(long j, InStreamOptions inStreamOptions, Map<WorkerNetAddress, Long> map) throws IOException {
        return getInStream(getInfo(j), inStreamOptions, map);
    }

    public BlockInStream getInStream(BlockInfo blockInfo, InStreamOptions inStreamOptions, Map<WorkerNetAddress, Long> map) throws IOException {
        Pair<WorkerNetAddress, BlockInStream.BlockInStreamSource> dataSourceAndType = getDataSourceAndType(blockInfo, inStreamOptions.getStatus(), inStreamOptions.getUfsReadLocationPolicy(), map);
        WorkerNetAddress first = dataSourceAndType.getFirst();
        try {
            return BlockInStream.create(this.mContext, blockInfo, first, dataSourceAndType.getSecond(), inStreamOptions);
        } catch (UnavailableException e) {
            LOG.info("Added {} to failedWorkers for {}", first, e.toString());
            map.put(first, Long.valueOf(System.currentTimeMillis()));
            throw e;
        }
    }

    public Pair<WorkerNetAddress, BlockInStream.BlockInStreamSource> getDataSourceAndType(BlockInfo blockInfo, URIStatus uRIStatus, BlockLocationPolicy blockLocationPolicy, Map<WorkerNetAddress, Long> map) throws IOException {
        Set<WorkerNetAddress> set;
        List<BlockLocation> locations = blockInfo.getLocations();
        List<BlockWorkerInfo> emptyList = Collections.emptyList();
        if (uRIStatus.isPersisted() || uRIStatus.getPersistenceState().equals("TO_BE_PERSISTED")) {
            emptyList = this.mContext.getCachedWorkers();
            if (emptyList.isEmpty()) {
                throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
            }
            set = (Set) emptyList.stream().map((v0) -> {
                return v0.getNetAddress();
            }).collect(Collectors.toSet());
        } else {
            if (locations.isEmpty()) {
                if (this.mContext.getCachedWorkers().isEmpty()) {
                    throw new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
                }
                throw new UnavailableException(MessageFormat.format("Block {0} is unavailable in both Alluxio and UFS.", Long.valueOf(blockInfo.getBlockId())));
            }
            set = (Set) locations.stream().map((v0) -> {
                return v0.getWorkerAddress();
            }).collect(Collectors.toSet());
        }
        Set<WorkerNetAddress> handleFailedWorkers = handleFailedWorkers(set, map);
        BlockInStream.BlockInStreamSource blockInStreamSource = null;
        WorkerNetAddress workerNetAddress = null;
        List<BlockLocation> list = (List) locations.stream().filter(blockLocation -> {
            return handleFailedWorkers.contains(blockLocation.getWorkerAddress());
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            List list2 = (List) list.stream().map((v0) -> {
                return v0.getWorkerAddress();
            }).collect(Collectors.toList());
            Collections.shuffle(list2);
            Optional<Pair<WorkerNetAddress, Boolean>> nearest = BlockLocationUtils.nearest(this.mTieredIdentity, list2, this.mContext.getClusterConf());
            if (nearest.isPresent()) {
                workerNetAddress = nearest.get().getFirst();
                blockInStreamSource = nearest.get().getSecond().booleanValue() ? this.mContext.hasProcessLocalWorker() ? BlockInStream.BlockInStreamSource.PROCESS_LOCAL : BlockInStream.BlockInStreamSource.NODE_LOCAL : BlockInStream.BlockInStreamSource.REMOTE;
            }
        }
        if (workerNetAddress == null) {
            blockInStreamSource = BlockInStream.BlockInStreamSource.UFS;
            Preconditions.checkNotNull(blockLocationPolicy, "The UFS read location policy is not specified");
            workerNetAddress = blockLocationPolicy.getWorker(GetWorkerOptions.defaults().setBlockInfo(new BlockInfo().setBlockId(blockInfo.getBlockId()).setLength(blockInfo.getLength()).setLocations(list)).setBlockWorkerInfos((List) emptyList.stream().filter(blockWorkerInfo -> {
                return handleFailedWorkers.contains(blockWorkerInfo.getNetAddress());
            }).collect(Collectors.toList()))).orElseThrow(() -> {
                return new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0]));
            });
            if (this.mContext.hasProcessLocalWorker() && workerNetAddress.equals(this.mContext.getNodeLocalWorker())) {
                blockInStreamSource = BlockInStream.BlockInStreamSource.PROCESS_LOCAL;
                LOG.debug("Create BlockInStream to read data from UFS through process local worker {}", workerNetAddress);
            } else {
                Logger logger = LOG;
                Object[] objArr = new Object[4];
                objArr[0] = workerNetAddress;
                objArr[1] = Boolean.valueOf(this.mContext.hasProcessLocalWorker());
                objArr[2] = Boolean.valueOf(this.mContext.hasNodeLocalWorker());
                objArr[3] = this.mContext.hasNodeLocalWorker() ? this.mContext.getNodeLocalWorker() : "N/A";
                logger.debug("Create BlockInStream to read data from UFS through worker {} (client embedded in local worker process: {},client co-located with worker in different processes: {}, local worker address: {})", objArr);
            }
        }
        return new Pair<>(workerNetAddress, blockInStreamSource);
    }

    private Set<WorkerNetAddress> handleFailedWorkers(Set<WorkerNetAddress> set, Map<WorkerNetAddress, Long> map) {
        if (set.isEmpty()) {
            return Collections.emptySet();
        }
        Set<WorkerNetAddress> set2 = (Set) set.stream().filter(workerNetAddress -> {
            return !map.containsKey(workerNetAddress);
        }).collect(Collectors.toSet());
        if (!set2.isEmpty()) {
            return set2;
        }
        Stream<WorkerNetAddress> stream = set.stream();
        map.getClass();
        return Collections.singleton(stream.min(Comparator.comparingLong((v1) -> {
            return r1.get(v1);
        })).get());
    }

    public BlockOutStream getOutStream(long j, long j2, WorkerNetAddress workerNetAddress, OutStreamOptions outStreamOptions) throws IOException {
        Preconditions.checkNotNull(workerNetAddress, "address");
        LOG.debug("Create BlockOutStream for {} of block size {} at address {}, using options: {}", new Object[]{Long.valueOf(j), Long.valueOf(j2), workerNetAddress, outStreamOptions});
        return new BlockOutStream(DataWriter.Factory.create(this.mContext, j, j2, workerNetAddress, outStreamOptions), j2, workerNetAddress);
    }

    public BlockOutStream getOutStream(long j, long j2, OutStreamOptions outStreamOptions) throws IOException {
        BlockLocationPolicy blockLocationPolicy = (BlockLocationPolicy) Preconditions.checkNotNull(outStreamOptions.getLocationPolicy(), PreconditionMessage.BLOCK_WRITE_LOCATION_POLICY_UNSPECIFIED);
        GetWorkerOptions blockWorkerInfos = GetWorkerOptions.defaults().setBlockInfo(new BlockInfo().setBlockId(j).setLength(j2)).setBlockWorkerInfos(new ArrayList(this.mContext.getCachedWorkers()));
        int replicationMin = (outStreamOptions.getWriteType() != WriteType.ASYNC_THROUGH || outStreamOptions.getReplicationDurable() <= outStreamOptions.getReplicationMin()) ? outStreamOptions.getReplicationMin() : outStreamOptions.getReplicationDurable();
        if (replicationMin <= 1) {
            return getOutStream(j, j2, blockLocationPolicy.getWorker(blockWorkerInfos).orElseThrow(() -> {
                try {
                    return this.mContext.getCachedWorkers().isEmpty() ? new UnavailableException(ExceptionMessage.NO_WORKER_AVAILABLE.getMessage(new Object[0])) : new UnavailableException(ExceptionMessage.NO_SPACE_FOR_BLOCK_ON_WORKER.getMessage(Long.valueOf(j2)));
                } catch (IOException e) {
                    return e;
                }
            }), outStreamOptions);
        }
        HashMap hashMap = new HashMap();
        for (BlockWorkerInfo blockWorkerInfo : blockWorkerInfos.getBlockWorkerInfos()) {
            String host = blockWorkerInfo.getNetAddress().getHost();
            if (hashMap.containsKey(host)) {
                ((Set) hashMap.get(host)).add(blockWorkerInfo);
            } else {
                hashMap.put(host, Sets.newHashSet(blockWorkerInfo));
            }
        }
        ArrayList arrayList = new ArrayList();
        ArrayList newArrayList = Lists.newArrayList(blockWorkerInfos.getBlockWorkerInfos());
        for (int i = 0; i < replicationMin; i++) {
            blockLocationPolicy.getWorker(blockWorkerInfos).ifPresent(workerNetAddress -> {
                arrayList.add(workerNetAddress);
                newArrayList.removeAll((Collection) hashMap.get(workerNetAddress.getHost()));
                blockWorkerInfos.setBlockWorkerInfos(newArrayList);
            });
        }
        if (arrayList.size() < replicationMin) {
            throw new ResourceExhaustedException(String.format("Not enough workers for replications, %d workers selected but %d required", Integer.valueOf(arrayList.size()), Integer.valueOf(replicationMin)));
        }
        return BlockOutStream.createReplicatedBlockOutStream(this.mContext, j, j2, arrayList, outStreamOptions);
    }

    public long getCapacityBytes() throws IOException {
        CloseableResource<BlockMasterClient> acquireBlockMasterClientResource = this.mContext.acquireBlockMasterClientResource();
        Throwable th = null;
        try {
            long capacityBytes = acquireBlockMasterClientResource.get().getCapacityBytes();
            if (acquireBlockMasterClientResource != null) {
                if (0 != 0) {
                    try {
                        acquireBlockMasterClientResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    acquireBlockMasterClientResource.close();
                }
            }
            return capacityBytes;
        } catch (Throwable th3) {
            if (acquireBlockMasterClientResource != null) {
                if (0 != 0) {
                    try {
                        acquireBlockMasterClientResource.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    acquireBlockMasterClientResource.close();
                }
            }
            throw th3;
        }
    }

    public long getUsedBytes() throws IOException {
        CloseableResource<BlockMasterClient> acquireBlockMasterClientResource = this.mContext.acquireBlockMasterClientResource();
        Throwable th = null;
        try {
            long usedBytes = acquireBlockMasterClientResource.get().getUsedBytes();
            if (acquireBlockMasterClientResource != null) {
                if (0 != 0) {
                    try {
                        acquireBlockMasterClientResource.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    acquireBlockMasterClientResource.close();
                }
            }
            return usedBytes;
        } catch (Throwable th3) {
            if (acquireBlockMasterClientResource != null) {
                if (0 != 0) {
                    try {
                        acquireBlockMasterClientResource.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    acquireBlockMasterClientResource.close();
                }
            }
            throw th3;
        }
    }
}
