package org.apache.flink.runtime.shuffle;

import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.BatchShuffleMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.AllTieredShuffleMasterSnapshots;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMasterSnapshot;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.util.ConfigurationParserUtils;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/shuffle/NettyShuffleMaster.class */
public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> {
    private final int buffersPerInputChannel;
    private final int floatingBuffersPerGate;
    private final Optional<Integer> maxRequiredBuffersPerGate;
    private final int sortShuffleMinParallelism;
    private final int sortShuffleMinBuffers;
    private final int networkBufferSize;
    private final boolean enableJobMasterFailover;

    @Nullable
    private final TieredInternalShuffleMaster tieredInternalShuffleMaster;
    private final Map<JobID, JobShuffleContext> jobShuffleContexts = new HashMap();
    private final Map<JobID, Map<ResultPartitionID, ShuffleDescriptor>> jobShuffleDescriptors = new HashMap();

    public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
        Configuration configuration = shuffleMasterContext.getConfiguration();
        Preconditions.checkNotNull(configuration);
        this.buffersPerInputChannel = 2;
        this.floatingBuffersPerGate = 8;
        this.maxRequiredBuffersPerGate = configuration.getOptional(NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE);
        this.sortShuffleMinParallelism = 1;
        this.sortShuffleMinBuffers = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS)).intValue();
        this.networkBufferSize = ConfigurationParserUtils.getPageSize(configuration);
        if (isHybridShuffleEnabled(configuration)) {
            this.tieredInternalShuffleMaster = new TieredInternalShuffleMaster(shuffleMasterContext, this::getShuffleDescriptor);
        } else {
            this.tieredInternalShuffleMaster = null;
        }
        this.enableJobMasterFailover = ((Boolean) configuration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED)).booleanValue() && supportsBatchSnapshot();
        Preconditions.checkArgument(!this.maxRequiredBuffersPerGate.isPresent() || this.maxRequiredBuffersPerGate.get().intValue() >= 1, String.format("At least one buffer is required for each gate, please increase the value of %s.", NettyShuffleEnvironmentOptions.NETWORK_READ_MAX_REQUIRED_BUFFERS_PER_GATE.key()));
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public CompletableFuture<NettyShuffleDescriptor> registerPartitionWithProducer(JobID jobID, PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) {
        ResultPartitionID resultPartitionID = new ResultPartitionID(partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId());
        List<TierShuffleDescriptor> list = null;
        if (this.tieredInternalShuffleMaster != null) {
            list = this.tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor(jobID, partitionDescriptor.getNumberOfSubpartitions(), resultPartitionID);
        }
        NettyShuffleDescriptor nettyShuffleDescriptor = new NettyShuffleDescriptor(producerDescriptor.getProducerLocation(), createConnectionInfo(producerDescriptor, partitionDescriptor.getConnectionIndex()), resultPartitionID, list);
        if (this.enableJobMasterFailover) {
            this.jobShuffleDescriptors.computeIfAbsent(jobID, jobID2 -> {
                return new HashMap();
            }).put(resultPartitionID, nettyShuffleDescriptor);
        }
        return CompletableFuture.completedFuture(nettyShuffleDescriptor);
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.releasePartition(shuffleDescriptor);
        }
    }

    public Optional<ShuffleDescriptor> getShuffleDescriptor(JobID jobID, ResultPartitionID resultPartitionID) {
        return Optional.ofNullable(this.jobShuffleDescriptors.get(jobID)).map(map -> {
            return (ShuffleDescriptor) map.get(resultPartitionID);
        });
    }

    private static NettyShuffleDescriptor.PartitionConnectionInfo createConnectionInfo(ProducerDescriptor producerDescriptor, int i) {
        return producerDescriptor.getDataPort() >= 0 ? NettyShuffleDescriptor.NetworkPartitionConnectionInfo.fromProducerDescriptor(producerDescriptor, i) : NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo.INSTANCE;
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public MemorySize computeShuffleMemorySizeForTask(TaskInputsOutputsDescriptor taskInputsOutputsDescriptor) {
        Preconditions.checkNotNull(taskInputsOutputsDescriptor);
        return new MemorySize(this.networkBufferSize * NettyShuffleUtils.computeNetworkBuffersForAnnouncing(this.buffersPerInputChannel, this.floatingBuffersPerGate, this.maxRequiredBuffersPerGate, this.sortShuffleMinParallelism, this.sortShuffleMinBuffers, taskInputsOutputsDescriptor.getInputChannelNums(), taskInputsOutputsDescriptor.getPartitionReuseCount(), taskInputsOutputsDescriptor.getSubpartitionNums(), taskInputsOutputsDescriptor.getInputPartitionTypes(), taskInputsOutputsDescriptor.getPartitionTypes()));
    }

    private boolean isHybridShuffleEnabled(Configuration configuration) {
        return configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL || configuration.get(ExecutionOptions.BATCH_SHUFFLE_MODE) == BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE;
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public CompletableFuture<Collection<PartitionWithMetrics>> getPartitionWithMetrics(JobID jobID, Duration duration, Set<ResultPartitionID> set) {
        return this.tieredInternalShuffleMaster != null ? this.tieredInternalShuffleMaster.getPartitionWithMetrics(this.jobShuffleContexts.get(jobID), duration, set) : ((JobShuffleContext) Preconditions.checkNotNull(this.jobShuffleContexts.get(jobID))).getPartitionWithMetrics(duration, set);
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void registerJob(JobShuffleContext jobShuffleContext) {
        this.jobShuffleContexts.put(jobShuffleContext.getJobId(), jobShuffleContext);
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.registerJob(jobShuffleContext);
        }
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void unregisterJob(JobID jobID) {
        this.jobShuffleContexts.remove(jobID);
        if (this.tieredInternalShuffleMaster != null) {
            if (this.enableJobMasterFailover) {
                this.jobShuffleDescriptors.remove(jobID);
            }
            this.tieredInternalShuffleMaster.unregisterJob(jobID);
        }
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public boolean supportsBatchSnapshot() {
        if (this.tieredInternalShuffleMaster != null) {
            return this.tieredInternalShuffleMaster.supportsBatchSnapshot();
        }
        return true;
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void snapshotState(CompletableFuture<ShuffleMasterSnapshot> completableFuture, ShuffleMasterSnapshotContext shuffleMasterSnapshotContext, JobID jobID) {
        if (this.tieredInternalShuffleMaster == null) {
            completableFuture.complete(EmptyShuffleMasterSnapshot.getInstance());
            return;
        }
        Map<ResultPartitionID, ShuffleDescriptor> remove = this.jobShuffleDescriptors.remove(jobID);
        CompletableFuture<AllTieredShuffleMasterSnapshots> completableFuture2 = new CompletableFuture<>();
        this.tieredInternalShuffleMaster.snapshotState(completableFuture2, shuffleMasterSnapshotContext, jobID);
        completableFuture2.thenAccept(allTieredShuffleMasterSnapshots -> {
            completableFuture.complete(new TieredInternalShuffleMasterSnapshot(remove, allTieredShuffleMasterSnapshots));
        });
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void snapshotState(CompletableFuture<ShuffleMasterSnapshot> completableFuture) {
        if (this.tieredInternalShuffleMaster == null) {
            completableFuture.complete(EmptyShuffleMasterSnapshot.getInstance());
            return;
        }
        CompletableFuture<AllTieredShuffleMasterSnapshots> completableFuture2 = new CompletableFuture<>();
        this.tieredInternalShuffleMaster.snapshotState(completableFuture2);
        completableFuture2.thenAccept(allTieredShuffleMasterSnapshots -> {
            completableFuture.complete(new TieredInternalShuffleMasterSnapshot(null, allTieredShuffleMasterSnapshots));
        });
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void restoreState(ShuffleMasterSnapshot shuffleMasterSnapshot) {
        if (this.tieredInternalShuffleMaster != null) {
            Preconditions.checkState(shuffleMasterSnapshot instanceof TieredInternalShuffleMasterSnapshot);
            this.tieredInternalShuffleMaster.restoreState((TieredInternalShuffleMasterSnapshot) shuffleMasterSnapshot);
        }
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void restoreState(List<ShuffleMasterSnapshot> list, JobID jobID) {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.restoreState((List) list.stream().map(shuffleMasterSnapshot -> {
                Preconditions.checkState(shuffleMasterSnapshot instanceof TieredInternalShuffleMasterSnapshot);
                Map<ResultPartitionID, ShuffleDescriptor> shuffleDescriptors = ((TieredInternalShuffleMasterSnapshot) shuffleMasterSnapshot).getShuffleDescriptors();
                if (shuffleDescriptors != null) {
                    this.jobShuffleDescriptors.computeIfAbsent(jobID, jobID2 -> {
                        return new HashMap();
                    }).putAll(shuffleDescriptors);
                }
                return (TieredInternalShuffleMasterSnapshot) shuffleMasterSnapshot;
            }).collect(Collectors.toList()), jobID);
        }
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster
    public void notifyPartitionRecoveryStarted(JobID jobID) {
        ((JobShuffleContext) Preconditions.checkNotNull(this.jobShuffleContexts.get(jobID))).notifyPartitionRecoveryStarted();
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleMaster, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.tieredInternalShuffleMaster != null) {
            this.tieredInternalShuffleMaster.close();
        }
    }
}
