package org.apache.flink.runtime.io.network.partition;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.shaded.guava32.com.google.common.collect.BiMap;
import org.apache.flink.shaded.guava32.com.google.common.collect.HashBiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.class */
public class UnionResultSubpartitionView implements ResultSubpartitionView, BufferAvailabilityListener {
    private static final Logger LOG = LoggerFactory.getLogger(UnionResultSubpartitionView.class);
    private static final int CACHE_CAPACITY = 10;
    private final BufferAvailabilityListener availabilityListener;
    private final int numTotalViews;
    private final Object lock = new Object();

    @GuardedBy("lock")
    private final BiMap<Integer, ResultSubpartitionView> allViews = HashBiMap.create();

    @GuardedBy("lock")
    private final SubpartitionSelector<ResultSubpartitionView> availableViews = new RoundRobinSubpartitionSelector();

    @GuardedBy("lock")
    private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>> cachedBuffers = new LinkedList();

    @GuardedBy("lock")
    private final Set<ResultSubpartitionView> unregisteredAvailableViews = new HashSet();

    @GuardedBy("lock")
    private boolean isReleased = false;

    @GuardedBy("lock")
    private int sequenceNumber = 0;

    public UnionResultSubpartitionView(BufferAvailabilityListener bufferAvailabilityListener, int i) {
        this.availabilityListener = bufferAvailabilityListener;
        this.numTotalViews = i;
    }

    public void notifyViewCreated(int i, ResultSubpartitionView resultSubpartitionView) {
        synchronized (this.lock) {
            this.allViews.put(Integer.valueOf(i), resultSubpartitionView);
            if (this.allViews.size() == this.numTotalViews) {
                Iterator<ResultSubpartitionView> it = this.unregisteredAvailableViews.iterator();
                while (it.hasNext()) {
                    notifyDataAvailable(it.next());
                }
                this.unregisteredAvailableViews.clear();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int peekNextBufferSubpartitionId() throws IOException {
        int intValue;
        synchronized (this.lock) {
            cacheBuffer();
            intValue = this.cachedBuffers.isEmpty() ? -1 : ((Integer) this.cachedBuffers.peek().f1).intValue();
        }
        return intValue;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    @Nullable
    public ResultSubpartition.BufferAndBacklog getNextBuffer() throws IOException {
        synchronized (this.lock) {
            cacheBuffer();
            if (this.cachedBuffers.isEmpty()) {
                return null;
            }
            Buffer buffer = ((ResultSubpartition.BufferAndBacklog) this.cachedBuffers.poll().f0).buffer();
            int size = this.cachedBuffers.size();
            Buffer.DataType dataType = this.cachedBuffers.isEmpty() ? Buffer.DataType.NONE : ((ResultSubpartition.BufferAndBacklog) this.cachedBuffers.peek().f0).buffer().getDataType();
            int i = this.sequenceNumber;
            this.sequenceNumber = i + 1;
            return new ResultSubpartition.BufferAndBacklog(buffer, size, dataType, i);
        }
    }

    private void cacheBuffer() throws IOException {
        ResultSubpartitionView nextSubpartitionToConsume;
        while (this.cachedBuffers.size() < 10 && (nextSubpartitionToConsume = this.availableViews.getNextSubpartitionToConsume()) != null) {
            ResultSubpartition.BufferAndBacklog nextBuffer = nextSubpartitionToConsume.getNextBuffer();
            if (nextBuffer == null) {
                this.availableViews.markLastConsumptionStatus(false, false);
                if (!this.availableViews.isMoreSubpartitionSwitchable()) {
                    return;
                }
            } else {
                this.availableViews.markLastConsumptionStatus(true, nextBuffer.buffer().getDataType().isPartialRecord());
                this.cachedBuffers.add(Tuple2.of(nextBuffer, (Integer) this.allViews.inverse().get(nextSubpartitionToConsume)));
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyDataAvailable() {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener
    public void notifyDataAvailable(ResultSubpartitionView resultSubpartitionView) {
        synchronized (this.lock) {
            if (!this.allViews.containsValue(resultSubpartitionView)) {
                this.unregisteredAvailableViews.add(resultSubpartitionView);
                return;
            }
            if (this.availableViews.notifyDataAvailable(resultSubpartitionView) && this.cachedBuffers.isEmpty()) {
                if (this.allViews.size() < this.numTotalViews) {
                    return;
                }
                try {
                    cacheBuffer();
                    if (this.cachedBuffers.isEmpty()) {
                        return;
                    }
                    this.availabilityListener.notifyDataAvailable(this);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyPriorityEvent(int i) {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void releaseAllResources() throws IOException {
        synchronized (this.lock) {
            Iterator it = this.allViews.values().iterator();
            while (it.hasNext()) {
                ((ResultSubpartitionView) it.next()).releaseAllResources();
            }
            this.allViews.clear();
            Iterator<ResultSubpartitionView> it2 = this.unregisteredAvailableViews.iterator();
            while (it2.hasNext()) {
                it2.next().releaseAllResources();
            }
            this.unregisteredAvailableViews.clear();
            Iterator<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>> it3 = this.cachedBuffers.iterator();
            while (it3.hasNext()) {
                ((ResultSubpartition.BufferAndBacklog) it3.next().f0).buffer().recycleBuffer();
            }
            this.cachedBuffers.clear();
            this.isReleased = true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public boolean isReleased() {
        boolean z;
        synchronized (this.lock) {
            z = this.isReleased;
        }
        return z;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void resumeConsumption() {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void acknowledgeAllDataProcessed() {
        throw new UnsupportedOperationException("Method should never be called.");
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public Throwable getFailureCause() {
        Throwable th = null;
        synchronized (this.lock) {
            for (ResultSubpartitionView resultSubpartitionView : this.allViews.values()) {
                if (resultSubpartitionView.getFailureCause() != null) {
                    th = resultSubpartitionView.getFailureCause();
                    LOG.error(th.toString());
                }
            }
        }
        return th;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public ResultSubpartitionView.AvailabilityWithBacklog getAvailabilityAndBacklog(boolean z) {
        synchronized (this.lock) {
            try {
                cacheBuffer();
                if (this.cachedBuffers.isEmpty()) {
                    return new ResultSubpartitionView.AvailabilityWithBacklog(false, 0);
                }
                return new ResultSubpartitionView.AvailabilityWithBacklog(z || ((ResultSubpartition.BufferAndBacklog) this.cachedBuffers.peek().f0).buffer().getDataType().isEvent(), (int) this.cachedBuffers.stream().filter(tuple2 -> {
                    return ((ResultSubpartition.BufferAndBacklog) tuple2.f0).buffer().getDataType().isBuffer();
                }).count());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyRequiredSegmentId(int i, int i2) {
        synchronized (this.lock) {
            ((ResultSubpartitionView) this.allViews.get(Integer.valueOf(i))).notifyRequiredSegmentId(i, i2);
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        return this.cachedBuffers.size();
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public int getNumberOfQueuedBuffers() {
        int size;
        synchronized (this.lock) {
            size = this.cachedBuffers.size();
        }
        return size;
    }

    @Override // org.apache.flink.runtime.io.network.partition.ResultSubpartitionView
    public void notifyNewBufferSize(int i) {
        synchronized (this.lock) {
            Iterator it = this.allViews.values().iterator();
            while (it.hasNext()) {
                ((ResultSubpartitionView) it.next()).notifyNewBufferSize(i);
            }
        }
    }
}
