/*
 * Decompiled with CFR 0.152.
 */
package com.rabbitmq.qpid.protonj2.client.impl;

import com.rabbitmq.qpid.protonj2.buffer.ProtonCompositeBuffer;
import com.rabbitmq.qpid.protonj2.client.StreamDelivery;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientDeliveryAbortedException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientIllegalStateException;
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientResourceRemotelyClosedException;
import com.rabbitmq.qpid.protonj2.client.futures.ClientFuture;
import com.rabbitmq.qpid.protonj2.client.impl.ClientDeliverable;
import com.rabbitmq.qpid.protonj2.client.impl.ClientStreamReceiver;
import com.rabbitmq.qpid.protonj2.client.impl.ClientStreamReceiverMessage;
import com.rabbitmq.qpid.protonj2.engine.IncomingDelivery;
import com.rabbitmq.qpid.protonj2.engine.Scheduler;
import com.rabbitmq.qpid.protonj2.engine.exceptions.EngineFailedException;
import com.rabbitmq.qpid.protonj2.engine.util.StringUtils;
import com.rabbitmq.qpid.protonj2.types.messaging.Accepted;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ClientStreamDelivery
extends ClientDeliverable<ClientStreamDelivery, ClientStreamReceiver>
implements StreamDelivery {
    private static final Logger LOG = LoggerFactory.getLogger(ClientStreamDelivery.class);
    private final ClientStreamReceiver receiver;
    private final IncomingDelivery protonDelivery;
    private ClientStreamReceiverMessage message;
    private RawDeliveryInputStream rawInputStream;

    ClientStreamDelivery(ClientStreamReceiver receiver, IncomingDelivery protonDelivery) {
        super(receiver, protonDelivery);
        this.receiver = receiver;
        this.protonDelivery = protonDelivery.setLinkedResource(this);
        this.autoAcceptDeliveryIfNecessary();
        protonDelivery.deliveryReadHandler(this::handleDeliveryRead).deliveryAbortedHandler(this::handleDeliveryAborted);
    }

    @Override
    protected ClientStreamDelivery self() {
        return this;
    }

    @Override
    public ClientStreamReceiver receiver() {
        return this.receiver;
    }

    @Override
    public boolean aborted() {
        return this.protonDelivery.isAborted();
    }

    @Override
    public boolean completed() {
        return !this.protonDelivery.isPartial();
    }

    @Override
    public ClientStreamReceiverMessage message() throws ClientException {
        if (this.rawInputStream != null && this.message == null) {
            throw new ClientIllegalStateException("Cannot access Delivery Message API after requesting an InputStream");
        }
        if (this.message == null) {
            this.rawInputStream = new RawDeliveryInputStream();
            this.message = new ClientStreamReceiverMessage(this.receiver, this, this.rawInputStream);
        }
        return this.message;
    }

    @Override
    public Map<String, Object> annotations() throws ClientException {
        if (this.rawInputStream != null && this.message == null) {
            throw new ClientIllegalStateException("Cannot access Delivery Annotations API after requesting an InputStream");
        }
        return StringUtils.toStringKeyedMap(this.message().deliveryAnnotations() != null ? this.message().deliveryAnnotations().getValue() : null);
    }

    @Override
    public InputStream rawInputStream() throws ClientException {
        if (this.message != null) {
            throw new ClientIllegalStateException("Cannot access Delivery InputStream API after requesting an Message");
        }
        if (this.rawInputStream == null) {
            this.rawInputStream = new RawDeliveryInputStream();
        }
        return this.rawInputStream;
    }

    void handleDeliveryRead(IncomingDelivery delivery) {
        try {
            if (this.rawInputStream != null) {
                this.rawInputStream.handleDeliveryRead(delivery);
            }
        }
        finally {
            this.autoAcceptDeliveryIfNecessary();
        }
    }

    void handleDeliveryAborted(IncomingDelivery delivery) {
        try {
            if (this.rawInputStream != null) {
                this.rawInputStream.handleDeliveryAborted(delivery);
            }
        }
        finally {
            try {
                this.receiver.disposition(delivery, null, true);
            }
            catch (Exception exception) {}
        }
    }

    void handleReceiverClosed(ClientStreamReceiver receiver) {
        if (this.rawInputStream != null) {
            this.rawInputStream.handleReceiverClosed(receiver);
        }
    }

    private void autoAcceptDeliveryIfNecessary() {
        if (this.receiver.receiverOptions().autoAccept() && !this.protonDelivery.isSettled() && !this.protonDelivery.isPartial()) {
            try {
                this.receiver.disposition(this.protonDelivery, Accepted.getInstance(), this.receiver.receiverOptions().autoSettle());
            }
            catch (Exception error) {
                LOG.trace("Caught error while attempting to auto accept the fully read delivery.", (Throwable)error);
            }
        }
    }

    private class RawDeliveryInputStream
    extends InputStream {
        private final int INVALID_MARK = -1;
        private final ProtonCompositeBuffer buffer;
        private final Scheduler executor;
        private final AtomicBoolean closed;
        private ClientFuture<Integer> readRequest;
        private int markIndex;
        private int markLimit;

        public RawDeliveryInputStream() {
            this.executor = ClientStreamDelivery.this.receiver.session().getScheduler();
            this.closed = new AtomicBoolean();
            this.markIndex = -1;
            this.buffer = ClientStreamDelivery.this.receiver.session().connection().getEngine().configuration().getBufferAllocator().composite().convertToReadOnly();
        }

        @Override
        public void close() throws IOException {
            this.markLimit = 0;
            this.markIndex = -1;
            if (this.closed.compareAndSet(false, true)) {
                ClientFuture closed = ClientStreamDelivery.this.receiver.session().getFutureFactory().createFuture();
                try {
                    this.executor.execute(() -> {
                        try {
                            ClientStreamDelivery.this.protonDelivery.readAll();
                        }
                        catch (EngineFailedException engineFailedException) {
                            // empty catch block
                        }
                        try {
                            this.buffer.close();
                        }
                        catch (Exception exception) {
                            // empty catch block
                        }
                        if (this.readRequest != null) {
                            this.readRequest.complete(-1);
                            this.readRequest = null;
                        }
                        closed.complete(null);
                    });
                    ClientStreamDelivery.this.receiver.session().request(ClientStreamDelivery.this.receiver, closed);
                }
                catch (Exception error) {
                    LOG.debug("Ignoring error on RawInputStream close: ", (Throwable)error);
                }
                finally {
                    super.close();
                    try {
                        this.buffer.close();
                    }
                    catch (Exception exception) {}
                }
            }
        }

        @Override
        public boolean markSupported() {
            return true;
        }

        @Override
        public synchronized void mark(int readlimit) {
            this.markIndex = this.buffer.getReadOffset();
            this.markLimit = readlimit;
        }

        @Override
        public synchronized void reset() throws IOException {
            if (this.markIndex != -1) {
                this.buffer.setReadOffset(this.markIndex);
                this.markIndex = -1;
                this.markLimit = 0;
            }
        }

        @Override
        public int available() throws IOException {
            this.checkStreamStateIsValid();
            if (this.buffer.isReadable()) {
                return this.buffer.getReadableBytes();
            }
            ClientFuture request = ClientStreamDelivery.this.receiver.session().getFutureFactory().createFuture();
            try {
                this.executor.execute(() -> {
                    if (ClientStreamDelivery.this.protonDelivery.available() > 0) {
                        this.buffer.append(ClientStreamDelivery.this.protonDelivery.readAll());
                    }
                    request.complete(this.buffer.getReadableBytes());
                });
                return (Integer)ClientStreamDelivery.this.receiver.session().request(ClientStreamDelivery.this.receiver, request);
            }
            catch (Exception e) {
                throw new IOException("Error reading requested data", e);
            }
        }

        @Override
        public int read() throws IOException {
            this.checkStreamStateIsValid();
            int result = -1;
            do {
                if (!this.buffer.isReadable()) continue;
                result = this.buffer.readByte() & 0xFF;
                this.tryReleaseReadBuffers();
                break;
            } while (this.requestMoreData() >= 0);
            return result;
        }

        @Override
        public int read(byte[] target, int offset, int length) throws IOException {
            this.checkStreamStateIsValid();
            Objects.checkFromIndexSize(offset, length, target.length);
            int remaining = length;
            int bytesRead = 0;
            if (length <= 0) {
                return 0;
            }
            while (remaining > 0) {
                if (this.buffer.isReadable()) {
                    if (this.buffer.getReadableBytes() < remaining) {
                        int readTarget = this.buffer.getReadableBytes();
                        this.buffer.readBytes(target, offset + bytesRead, this.buffer.getReadableBytes());
                        bytesRead += readTarget;
                        remaining -= readTarget;
                    } else {
                        this.buffer.readBytes(target, offset + bytesRead, remaining);
                        bytesRead += remaining;
                        remaining = 0;
                    }
                    this.tryReleaseReadBuffers();
                    continue;
                }
                if (this.requestMoreData() >= 0) continue;
                return bytesRead > 0 ? bytesRead : -1;
            }
            return bytesRead;
        }

        @Override
        public long skip(long amount) throws IOException {
            this.checkStreamStateIsValid();
            long remaining = amount;
            if (amount <= 0L) {
                return 0L;
            }
            while (remaining > 0L) {
                if (this.buffer.isReadable()) {
                    if ((long)this.buffer.getReadableBytes() < remaining) {
                        remaining -= (long)this.buffer.getReadableBytes();
                        this.buffer.advanceReadOffset(this.buffer.getReadableBytes());
                    } else {
                        this.buffer.advanceReadOffset((int)remaining);
                        remaining = 0L;
                    }
                    this.tryReleaseReadBuffers();
                    continue;
                }
                if (this.requestMoreData() >= 0) continue;
                break;
            }
            return amount - remaining;
        }

        @Override
        public long transferTo(OutputStream target) throws IOException {
            this.checkStreamStateIsValid();
            return super.transferTo(target);
        }

        private void tryReleaseReadBuffers() {
            if (this.buffer.getReadOffset() - this.markIndex > this.markLimit) {
                this.markIndex = -1;
                this.markLimit = 0;
                try {
                    this.buffer.splitComponentsFloor(this.buffer.getReadOffset()).close();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        }

        private void handleDeliveryRead(IncomingDelivery delivery) {
            if (this.closed.get()) {
                delivery.readAll();
            } else if (this.readRequest != null && !this.readRequest.isComplete()) {
                if (delivery.available() > 0) {
                    this.buffer.append(ClientStreamDelivery.this.protonDelivery.readAll());
                    this.readRequest.complete(this.buffer.getReadableBytes());
                } else if (!delivery.isPartial()) {
                    this.readRequest.complete(-1);
                }
                this.readRequest = null;
            }
        }

        private void handleDeliveryAborted(IncomingDelivery delivery) {
            if (this.readRequest != null) {
                this.readRequest.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
            }
        }

        private void handleReceiverClosed(ClientStreamReceiver receiver) {
            if (this.readRequest != null) {
                this.readRequest.failed(new ClientResourceRemotelyClosedException("The receiver link has been remotely closed."));
            }
        }

        private int requestMoreData() throws IOException {
            ClientFuture request = ClientStreamDelivery.this.receiver.session().getFutureFactory().createFuture();
            try {
                this.executor.execute(() -> {
                    if (ClientStreamDelivery.this.protonDelivery.getLink().isLocallyClosedOrDetached()) {
                        request.failed(new ClientException("Cannot read from delivery due to link having been closed"));
                    } else if (ClientStreamDelivery.this.protonDelivery.available() > 0) {
                        this.buffer.append(ClientStreamDelivery.this.protonDelivery.readAll());
                        request.complete(this.buffer.getReadableBytes());
                    } else if (ClientStreamDelivery.this.protonDelivery.isAborted()) {
                        request.failed(new ClientDeliveryAbortedException("The remote sender has aborted this delivery"));
                    } else if (!ClientStreamDelivery.this.protonDelivery.isPartial()) {
                        request.complete(-1);
                    } else {
                        this.readRequest = request;
                    }
                });
                return (Integer)ClientStreamDelivery.this.receiver.session().request(ClientStreamDelivery.this.receiver, request);
            }
            catch (Exception e) {
                throw new IOException("Error reading requested data", e);
            }
        }

        private void checkStreamStateIsValid() throws IOException {
            if (this.closed.get()) {
                throw new IOException("The InputStream has been explicitly closed");
            }
            if (ClientStreamDelivery.this.receiver.isClosed()) {
                throw new IOException("Underlying receiver has closed", ClientStreamDelivery.this.receiver.getFailureCause());
            }
        }
    }
}

