package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyConnectionReader;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.class */
public class DiskTierConsumerAgent implements TierConsumerAgent {
    private final Map<TieredStoragePartitionId, Map<TieredStorageSubpartitionId, CompletableFuture<NettyConnectionReader>>> nettyConnectionReaders = new HashMap();

    public DiskTierConsumerAgent(List<TieredStorageConsumerSpec> list, TieredStorageNettyService tieredStorageNettyService) {
        for (TieredStorageConsumerSpec tieredStorageConsumerSpec : list) {
            TieredStoragePartitionId partitionId = tieredStorageConsumerSpec.getPartitionId();
            Iterator<Integer> it = tieredStorageConsumerSpec.getSubpartitionIds().values().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                this.nettyConnectionReaders.computeIfAbsent(partitionId, tieredStoragePartitionId -> {
                    return new HashMap();
                }).put(new TieredStorageSubpartitionId(intValue), tieredStorageNettyService.registerConsumer(partitionId, new TieredStorageSubpartitionId(intValue)));
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void setup(TieredStorageMemoryManager tieredStorageMemoryManager) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void start() {
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void registerAvailabilityNotifier(AvailabilityNotifier availabilityNotifier) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void updateTierShuffleDescriptor(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageInputChannelId tieredStorageInputChannelId, TieredStorageSubpartitionId tieredStorageSubpartitionId, TierShuffleDescriptor tierShuffleDescriptor) {
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public int peekNextBufferSubpartitionId(TieredStoragePartitionId tieredStoragePartitionId, ResultSubpartitionIndexSet resultSubpartitionIndexSet) throws IOException {
        Iterator<CompletableFuture<NettyConnectionReader>> it = this.nettyConnectionReaders.get(tieredStoragePartitionId).values().iterator();
        while (it.hasNext()) {
            try {
                int peekNextBufferSubpartitionId = it.next().get().peekNextBufferSubpartitionId();
                if (resultSubpartitionIndexSet.contains(peekNextBufferSubpartitionId)) {
                    return peekNextBufferSubpartitionId;
                }
            } catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException("Failed to peek subpartition Id.", e);
            }
        }
        return -1;
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public Optional<Buffer> getNextBuffer(TieredStoragePartitionId tieredStoragePartitionId, TieredStorageSubpartitionId tieredStorageSubpartitionId, int i) {
        try {
            return this.nettyConnectionReaders.get(tieredStoragePartitionId).get(tieredStorageSubpartitionId).get().readBuffer(tieredStorageSubpartitionId.getSubpartitionId(), i);
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException("Failed to get next buffer.", e);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent
    public void close() throws IOException {
    }
}
