package io.vertx.amqp.impl;

import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpMessage;
import io.vertx.amqp.AmqpSender;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonSenderImpl;
import org.apache.qpid.proton.amqp.transport.DeliveryState;

/* loaded from: input_file:io/vertx/amqp/impl/AmqpSenderImpl.class */
public class AmqpSenderImpl implements AmqpSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(AmqpSender.class);
    private final ProtonSender sender;
    private final AmqpConnectionImpl connection;
    private boolean closed;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> drainHandler;
    private long remoteCredit = 0;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.vertx.amqp.impl.AmqpSenderImpl$1, reason: invalid class name */
    /* loaded from: input_file:io/vertx/amqp/impl/AmqpSenderImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType = new int[DeliveryState.DeliveryStateType.values().length];

        static {
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Rejected.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Modified.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Released.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[DeliveryState.DeliveryStateType.Accepted.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    private AmqpSenderImpl(ProtonSender protonSender, AmqpConnectionImpl amqpConnectionImpl, Handler<AsyncResult<AmqpSender>> handler) {
        this.sender = protonSender;
        this.connection = amqpConnectionImpl;
        protonSender.closeHandler(asyncResult -> {
            onClose(protonSender, asyncResult, false);
        }).detachHandler(asyncResult2 -> {
            onClose(protonSender, asyncResult2, true);
        });
        protonSender.sendQueueDrainHandler(protonSender2 -> {
            Handler<Void> handler2 = null;
            synchronized (this) {
                this.remoteCredit = ((ProtonSenderImpl) protonSender).getRemoteCredit();
                if (this.drainHandler != null) {
                    handler2 = this.drainHandler;
                }
            }
            if (handler2 != null) {
                handler2.handle((Object) null);
            }
        });
        protonSender.openHandler(asyncResult3 -> {
            if (asyncResult3.failed()) {
                handler.handle(asyncResult3.mapEmpty());
            } else {
                amqpConnectionImpl.register(this);
                handler.handle(Future.succeededFuture(this));
            }
        });
        protonSender.open();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void create(ProtonSender protonSender, AmqpConnectionImpl amqpConnectionImpl, Handler<AsyncResult<AmqpSender>> handler) {
        new AmqpSenderImpl(protonSender, amqpConnectionImpl, handler);
    }

    private void onClose(ProtonSender protonSender, AsyncResult<ProtonSender> asyncResult, boolean z) {
        Handler<Throwable> handler = null;
        boolean z2 = false;
        synchronized (this) {
            if (!this.closed && this.exceptionHandler != null) {
                handler = this.exceptionHandler;
            }
            if (!this.closed) {
                this.closed = true;
                z2 = true;
            }
        }
        if (handler != null) {
            if (asyncResult.succeeded()) {
                handler.handle(new Exception("Sender closed remotely"));
            } else {
                handler.handle(new Exception("Sender closed remotely with error", asyncResult.cause()));
            }
        }
        if (z2) {
            if (z) {
                protonSender.detach();
            } else {
                protonSender.close();
            }
        }
    }

    public synchronized boolean writeQueueFull() {
        return this.remoteCredit <= 0;
    }

    @Override // io.vertx.amqp.AmqpSender
    public AmqpConnection connection() {
        return this.connection;
    }

    @Override // io.vertx.amqp.AmqpSender
    public AmqpSender send(AmqpMessage amqpMessage) {
        return doSend(amqpMessage, null);
    }

    private AmqpSender doSend(AmqpMessage amqpMessage, Handler<AsyncResult<Void>> handler) {
        AmqpMessage build = amqpMessage.address() == null ? AmqpMessage.create(amqpMessage).address(address()).build() : amqpMessage;
        Handler handler2 = protonDelivery -> {
            Handler handler3 = handler;
            if (handler == null) {
                handler3 = asyncResult -> {
                    if (asyncResult.failed()) {
                        LOGGER.warn("Message rejected by remote peer", asyncResult.cause());
                    }
                };
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$qpid$proton$amqp$transport$DeliveryState$DeliveryStateType[protonDelivery.getRemoteState().getType().ordinal()]) {
                case 1:
                    handler3.handle(Future.failedFuture("message rejected (REJECTED): " + protonDelivery.getRemoteState().getError()));
                    return;
                case 2:
                    handler3.handle(Future.failedFuture("message rejected (MODIFIED)"));
                    return;
                case 3:
                    handler3.handle(Future.failedFuture("message rejected (RELEASED)"));
                    return;
                case 4:
                    handler3.handle(Future.succeededFuture());
                    return;
                default:
                    handler3.handle(Future.failedFuture("Unsupported delivery type: " + protonDelivery.getRemoteState().getType()));
                    return;
            }
        };
        synchronized (this) {
            this.remoteCredit--;
        }
        AmqpMessage amqpMessage2 = build;
        this.connection.runWithTrampoline(r7 -> {
            this.sender.send(amqpMessage2.unwrap(), handler2);
            synchronized (this) {
                this.remoteCredit = this.sender.getRemoteCredit();
            }
        });
        return this;
    }

    @Override // io.vertx.amqp.AmqpSender
    public synchronized AmqpSender exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public Future<Void> write(AmqpMessage amqpMessage) {
        Promise promise = Promise.promise();
        doSend(amqpMessage, promise);
        return promise.future();
    }

    public void write(AmqpMessage amqpMessage, Handler<AsyncResult<Void>> handler) {
        doSend(amqpMessage, handler);
    }

    @Override // io.vertx.amqp.AmqpSender
    /* renamed from: setWriteQueueMaxSize */
    public AmqpSender mo162setWriteQueueMaxSize(int i) {
        return this;
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        close(handler);
    }

    public synchronized AmqpSender drainHandler(Handler<Void> handler) {
        this.drainHandler = handler;
        return this;
    }

    @Override // io.vertx.amqp.AmqpSender
    public AmqpSender sendWithAck(AmqpMessage amqpMessage, Handler<AsyncResult<Void>> handler) {
        return doSend(amqpMessage, handler);
    }

    @Override // io.vertx.amqp.AmqpSender
    public Future<Void> sendWithAck(AmqpMessage amqpMessage) {
        Promise promise = Promise.promise();
        sendWithAck(amqpMessage, promise);
        return promise.future();
    }

    @Override // io.vertx.amqp.AmqpSender
    public void close(Handler<AsyncResult<Void>> handler) {
        Handler<AsyncResult<Void>> handler2 = handler == null ? asyncResult -> {
        } : handler;
        synchronized (this) {
            if (this.closed) {
                handler2.handle(Future.succeededFuture());
                return;
            }
            this.closed = true;
            this.connection.unregister(this);
            Handler<AsyncResult<Void>> handler3 = handler2;
            this.connection.runWithTrampoline(r5 -> {
                if (!this.sender.isOpen()) {
                    handler3.handle(Future.succeededFuture());
                    return;
                }
                try {
                    this.sender.closeHandler(asyncResult2 -> {
                        handler3.handle(asyncResult2.mapEmpty());
                    }).close();
                } catch (Exception e) {
                    handler3.handle(Future.failedFuture(e));
                }
            });
        }
    }

    @Override // io.vertx.amqp.AmqpSender
    public Future<Void> close() {
        Promise promise = Promise.promise();
        close(promise);
        return promise.future();
    }

    @Override // io.vertx.amqp.AmqpSender
    public String address() {
        return this.sender.getRemoteAddress();
    }

    @Override // io.vertx.amqp.AmqpSender
    public long remainingCredits() {
        return this.sender.getRemoteCredit();
    }

    /* renamed from: drainHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ WriteStream m169drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    public /* bridge */ /* synthetic */ void write(Object obj, Handler handler) {
        write((AmqpMessage) obj, (Handler<AsyncResult<Void>>) handler);
    }

    @Override // io.vertx.amqp.AmqpSender
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ WriteStream mo163exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.amqp.AmqpSender
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo164exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
