/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.util.Preconditions;
import scala.Tuple2;

public class RemoteInputChannel
extends InputChannel {
    private final InputChannelID id = new InputChannelID();
    private final ConnectionID connectionId;
    private final ConnectionManager connectionManager;
    private final Queue<Buffer> receivedBuffers = new ArrayDeque<Buffer>();
    private final AtomicBoolean isReleased = new AtomicBoolean();
    private volatile PartitionRequestClient partitionRequestClient;
    private int expectedSequenceNumber = 0;

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, IOMetricGroup metrics) {
        this(inputGate, channelIndex, partitionId, connectionId, connectionManager, (Tuple2<Integer, Integer>)new Tuple2((Object)0, (Object)0), metrics);
    }

    public RemoteInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ConnectionID connectionId, ConnectionManager connectionManager, Tuple2<Integer, Integer> initialAndMaxBackoff, IOMetricGroup metrics) {
        super(inputGate, channelIndex, partitionId, initialAndMaxBackoff, metrics.getNumBytesInRemoteCounter());
        this.connectionId = (ConnectionID)Preconditions.checkNotNull((Object)connectionId);
        this.connectionManager = (ConnectionManager)Preconditions.checkNotNull((Object)connectionManager);
    }

    @Override
    void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
        if (this.partitionRequestClient == null) {
            this.partitionRequestClient = this.connectionManager.createPartitionRequestClient(this.connectionId);
            this.partitionRequestClient.requestSubpartition(this.partitionId, subpartitionIndex, this, 0);
        }
    }

    void retriggerSubpartitionRequest(int subpartitionIndex) throws IOException, InterruptedException {
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Missing initial subpartition request.");
        if (this.increaseBackoff()) {
            this.partitionRequestClient.requestSubpartition(this.partitionId, subpartitionIndex, this, this.getCurrentBackoff());
        } else {
            this.failPartitionRequest();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    InputChannel.BufferAndAvailability getNextBuffer() throws IOException {
        int remaining;
        Buffer next;
        Preconditions.checkState((!this.isReleased.get() ? 1 : 0) != 0, (Object)"Queried for a buffer after channel has been closed.");
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Queried for a buffer before requesting a queue.");
        this.checkError();
        Queue<Buffer> queue = this.receivedBuffers;
        synchronized (queue) {
            next = this.receivedBuffers.poll();
            remaining = this.receivedBuffers.size();
        }
        this.numBytesIn.inc((long)next.getSize());
        return new InputChannel.BufferAndAvailability(next, remaining > 0);
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        Preconditions.checkState((!this.isReleased.get() ? 1 : 0) != 0, (Object)"Tried to send task event to producer after channel has been released.");
        Preconditions.checkState((this.partitionRequestClient != null ? 1 : 0) != 0, (Object)"Tried to send task event to producer before requesting a queue.");
        this.checkError();
        this.partitionRequestClient.sendTaskEvent(this.partitionId, event, this);
    }

    @Override
    boolean isReleased() {
        return this.isReleased.get();
    }

    @Override
    void notifySubpartitionConsumed() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void releaseAllResources() throws IOException {
        if (this.isReleased.compareAndSet(false, true)) {
            Queue<Buffer> queue = this.receivedBuffers;
            synchronized (queue) {
                Buffer buffer;
                while ((buffer = this.receivedBuffers.poll()) != null) {
                    buffer.recycle();
                }
            }
            if (this.partitionRequestClient != null) {
                this.partitionRequestClient.close(this);
            } else {
                this.connectionManager.closeOpenChannelConnections(this.connectionId);
            }
        }
    }

    public void failPartitionRequest() {
        this.setError(new PartitionNotFoundException(this.partitionId));
    }

    public String toString() {
        return "RemoteInputChannel [" + this.partitionId + " at " + this.connectionId + "]";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getNumberOfQueuedBuffers() {
        Queue<Buffer> queue = this.receivedBuffers;
        synchronized (queue) {
            return this.receivedBuffers.size();
        }
    }

    public InputChannelID getInputChannelId() {
        return this.id;
    }

    public BufferProvider getBufferProvider() throws IOException {
        if (this.isReleased.get()) {
            return null;
        }
        return this.inputGate.getBufferProvider();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onBuffer(Buffer buffer, int sequenceNumber) {
        boolean success = false;
        try {
            Queue<Buffer> queue = this.receivedBuffers;
            synchronized (queue) {
                if (!this.isReleased.get()) {
                    if (this.expectedSequenceNumber == sequenceNumber) {
                        int available = this.receivedBuffers.size();
                        this.receivedBuffers.add(buffer);
                        ++this.expectedSequenceNumber;
                        if (available == 0) {
                            this.notifyChannelNonEmpty();
                        }
                        success = true;
                    } else {
                        this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                    }
                }
            }
        }
        finally {
            if (!success) {
                buffer.recycle();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onEmptyBuffer(int sequenceNumber) {
        Queue<Buffer> queue = this.receivedBuffers;
        synchronized (queue) {
            if (!this.isReleased.get()) {
                if (this.expectedSequenceNumber == sequenceNumber) {
                    ++this.expectedSequenceNumber;
                } else {
                    this.onError(new BufferReorderingException(this.expectedSequenceNumber, sequenceNumber));
                }
            }
        }
    }

    public void onFailedPartitionRequest() {
        this.inputGate.triggerPartitionStateCheck(this.partitionId);
    }

    public void onError(Throwable cause) {
        this.setError(cause);
    }

    public static class BufferReorderingException
    extends IOException {
        private static final long serialVersionUID = -888282210356266816L;
        private final int expectedSequenceNumber;
        private final int actualSequenceNumber;

        public BufferReorderingException(int expectedSequenceNumber, int actualSequenceNumber) {
            this.expectedSequenceNumber = expectedSequenceNumber;
            this.actualSequenceNumber = actualSequenceNumber;
        }

        @Override
        public String getMessage() {
            return String.format("Buffer re-ordering: expected buffer with sequence number %d, but received %d.", this.expectedSequenceNumber, this.actualSequenceNumber);
        }
    }
}

