package org.apache.flink.runtime.io.network;

import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.metrics.NettyShuffleMetricFactory;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartitionFactory;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
import org.apache.flink.runtime.shuffle.ShuffleServiceFactory;
import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.class */
public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettyShuffleDescriptor, ResultPartition, SingleInputGate> {
    private static final Logger LOG = LoggerFactory.getLogger(NettyShuffleServiceFactory.class);
    private static final String DIR_NAME_PREFIX = "netty-shuffle";

    @Override // org.apache.flink.runtime.shuffle.ShuffleServiceFactory
    /* renamed from: createShuffleMaster, reason: merged with bridge method [inline-methods] */
    public ShuffleMaster<NettyShuffleDescriptor> createShuffleMaster2(ShuffleMasterContext shuffleMasterContext) {
        return new NettyShuffleMaster(shuffleMasterContext);
    }

    @Override // org.apache.flink.runtime.shuffle.ShuffleServiceFactory
    /* renamed from: createShuffleEnvironment, reason: merged with bridge method [inline-methods] */
    public ShuffleEnvironment<ResultPartition, SingleInputGate> createShuffleEnvironment2(ShuffleEnvironmentContext shuffleEnvironmentContext) {
        Preconditions.checkNotNull(shuffleEnvironmentContext);
        return createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration.fromConfiguration(shuffleEnvironmentContext.getConfiguration(), shuffleEnvironmentContext.getNetworkMemorySize(), shuffleEnvironmentContext.isLocalCommunicationOnly(), shuffleEnvironmentContext.getHostAddress()), shuffleEnvironmentContext.getTaskExecutorResourceId(), shuffleEnvironmentContext.getEventPublisher(), shuffleEnvironmentContext.getParentMetricGroup(), shuffleEnvironmentContext.getIoExecutor(), shuffleEnvironmentContext.getScheduledExecutor(), shuffleEnvironmentContext.getNumberOfSlots(), shuffleEnvironmentContext.getTmpDirPaths());
    }

    @VisibleForTesting
    static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration nettyShuffleEnvironmentConfiguration, ResourceID resourceID, TaskEventPublisher taskEventPublisher, MetricGroup metricGroup, Executor executor, ScheduledExecutor scheduledExecutor, int i, String[] strArr) {
        return createNettyShuffleEnvironment(nettyShuffleEnvironmentConfiguration, resourceID, taskEventPublisher, new ResultPartitionManager(nettyShuffleEnvironmentConfiguration.getPartitionRequestListenerTimeout(), scheduledExecutor), metricGroup, executor, i, strArr);
    }

    @VisibleForTesting
    static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration nettyShuffleEnvironmentConfiguration, ResourceID resourceID, TaskEventPublisher taskEventPublisher, ResultPartitionManager resultPartitionManager, MetricGroup metricGroup, Executor executor, int i, String[] strArr) {
        NettyConfig nettyConfig = nettyShuffleEnvironmentConfiguration.nettyConfig();
        return createNettyShuffleEnvironment(nettyShuffleEnvironmentConfiguration, resourceID, taskEventPublisher, resultPartitionManager, nettyConfig != null ? new NettyConnectionManager(resultPartitionManager, taskEventPublisher, nettyConfig, nettyShuffleEnvironmentConfiguration.isConnectionReuseEnabled()) : new LocalConnectionManager(), metricGroup, executor, i, strArr);
    }

    @VisibleForTesting
    public static NettyShuffleEnvironment createNettyShuffleEnvironment(NettyShuffleEnvironmentConfiguration nettyShuffleEnvironmentConfiguration, ResourceID resourceID, TaskEventPublisher taskEventPublisher, ResultPartitionManager resultPartitionManager, ConnectionManager connectionManager, MetricGroup metricGroup, Executor executor, int i, String[] strArr) {
        Preconditions.checkNotNull(nettyShuffleEnvironmentConfiguration);
        Preconditions.checkNotNull(resourceID);
        Preconditions.checkNotNull(taskEventPublisher);
        Preconditions.checkNotNull(resultPartitionManager);
        Preconditions.checkNotNull(metricGroup);
        Preconditions.checkNotNull(connectionManager);
        FileChannelManagerImpl fileChannelManagerImpl = new FileChannelManagerImpl(nettyShuffleEnvironmentConfiguration.getTempDirs(), DIR_NAME_PREFIX);
        if (LOG.isInfoEnabled()) {
            LOG.info("Created a new {} for storing result partitions of BLOCKING shuffles. Used directories:\n\t{}", FileChannelManager.class.getSimpleName(), Arrays.stream(fileChannelManagerImpl.getPaths()).map((v0) -> {
                return v0.getAbsolutePath();
            }).collect(Collectors.joining("\n\t")));
        }
        NetworkBufferPool networkBufferPool = new NetworkBufferPool(nettyShuffleEnvironmentConfiguration.numNetworkBuffers(), nettyShuffleEnvironmentConfiguration.networkBufferSize(), nettyShuffleEnvironmentConfiguration.getRequestSegmentsTimeout());
        BatchShuffleReadBufferPool batchShuffleReadBufferPool = new BatchShuffleReadBufferPool(nettyShuffleEnvironmentConfiguration.batchShuffleReadMemoryBytes(), nettyShuffleEnvironmentConfiguration.networkBufferSize());
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(Math.max(1, Math.min(batchShuffleReadBufferPool.getMaxConcurrentRequests(), Math.max(i, strArr.length))), new ExecutorThreadFactory("blocking-shuffle-io"));
        NettyShuffleMetricFactory.registerShuffleMetrics(metricGroup, networkBufferPool);
        TieredResultPartitionFactory tieredResultPartitionFactory = null;
        TieredStorageConfiguration tieredStorageConfiguration = nettyShuffleEnvironmentConfiguration.getTieredStorageConfiguration();
        TieredStorageNettyServiceImpl tieredStorageNettyServiceImpl = null;
        if (tieredStorageConfiguration != null) {
            TieredStorageResourceRegistry tieredStorageResourceRegistry = new TieredStorageResourceRegistry();
            tieredStorageNettyServiceImpl = new TieredStorageNettyServiceImpl(tieredStorageResourceRegistry);
            tieredResultPartitionFactory = new TieredResultPartitionFactory(tieredStorageConfiguration, tieredStorageNettyServiceImpl, tieredStorageResourceRegistry);
        }
        return new NettyShuffleEnvironment(resourceID, nettyShuffleEnvironmentConfiguration, networkBufferPool, connectionManager, resultPartitionManager, fileChannelManagerImpl, new ResultPartitionFactory(resultPartitionManager, fileChannelManagerImpl, networkBufferPool, batchShuffleReadBufferPool, newScheduledThreadPool, nettyShuffleEnvironmentConfiguration.getBlockingSubpartitionType(), nettyShuffleEnvironmentConfiguration.networkBuffersPerChannel(), nettyShuffleEnvironmentConfiguration.floatingNetworkBuffersPerGate(), nettyShuffleEnvironmentConfiguration.networkBufferSize(), nettyShuffleEnvironmentConfiguration.startingBufferSize(), nettyShuffleEnvironmentConfiguration.isBatchShuffleCompressionEnabled(), nettyShuffleEnvironmentConfiguration.getCompressionCodec(), nettyShuffleEnvironmentConfiguration.getMaxBuffersPerChannel(), nettyShuffleEnvironmentConfiguration.sortShuffleMinBuffers(), nettyShuffleEnvironmentConfiguration.sortShuffleMinParallelism(), nettyShuffleEnvironmentConfiguration.isSSLEnabled(), nettyShuffleEnvironmentConfiguration.getMaxOverdraftBuffersPerGate(), tieredResultPartitionFactory), new SingleInputGateFactory(resourceID, nettyShuffleEnvironmentConfiguration, connectionManager, resultPartitionManager, taskEventPublisher, networkBufferPool, tieredStorageConfiguration, tieredStorageNettyServiceImpl), executor, batchShuffleReadBufferPool, newScheduledThreadPool);
    }
}
