package akka.stream.alpakka.amqp.impl;

import akka.Done$;
import akka.stream.alpakka.amqp.AmqpReplyToSinkSettings;
import akka.stream.alpakka.amqp.WriteMessage;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import scala.$less$colon$less$;
import scala.Option;
import scala.concurrent.Promise;
import scala.reflect.ClassTag$;

/* compiled from: AmqpReplyToSinkStage.scala */
/* loaded from: input_file:akka/stream/alpakka/amqp/impl/AmqpReplyToSinkStage$$anon$1.class */
public final class AmqpReplyToSinkStage$$anon$1 extends GraphStageLogic implements AmqpConnectorLogic {
    private final AmqpReplyToSinkSettings settings;
    private Connection akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection;
    private Channel channel;
    private AsyncCallback<Throwable> shutdownCallback;
    private ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
    private volatile byte bitmap$0;
    private final /* synthetic */ AmqpReplyToSinkStage $outer;
    private final Promise promise$1;

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public final void preStart() {
        preStart();
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public Connection akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection() {
        return this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection_$eq(Connection connection) {
        this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection = connection;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public Channel channel() {
        return this.channel;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void channel_$eq(Channel channel) {
        this.channel = channel;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [akka.stream.alpakka.amqp.impl.AmqpReplyToSinkStage$$anon$1] */
    private AsyncCallback<Throwable> shutdownCallback$lzycompute() {
        AsyncCallback<Throwable> shutdownCallback;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                shutdownCallback = shutdownCallback();
                this.shutdownCallback = shutdownCallback;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.shutdownCallback;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public AsyncCallback<Throwable> shutdownCallback() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? shutdownCallback$lzycompute() : this.shutdownCallback;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [akka.stream.alpakka.amqp.impl.AmqpReplyToSinkStage$$anon$1] */
    private ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzycompute() {
        ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener = akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener();
                this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener = akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzycompute() : this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public AmqpReplyToSinkSettings settings() {
        return this.settings;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void whenConnected() {
        pull(this.$outer.in());
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void postStop() {
        this.promise$1.tryFailure(new RuntimeException("stage stopped unexpectedly"));
        postStop();
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void onFailure(Throwable th) {
        this.promise$1.tryFailure(th);
        onFailure(th);
    }

    public /* synthetic */ AmqpReplyToSinkStage akka$stream$alpakka$amqp$impl$AmqpReplyToSinkStage$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AmqpReplyToSinkStage$$anon$1(AmqpReplyToSinkStage amqpReplyToSinkStage, final Promise promise) {
        super(amqpReplyToSinkStage.m22shape());
        if (amqpReplyToSinkStage == null) {
            throw null;
        }
        this.$outer = amqpReplyToSinkStage;
        this.promise$1 = promise;
        AmqpConnectorLogic.$init$(this);
        this.settings = amqpReplyToSinkStage.akka$stream$alpakka$amqp$impl$AmqpReplyToSinkStage$$settings;
        setHandler(amqpReplyToSinkStage.in(), new InHandler(this, promise) { // from class: akka.stream.alpakka.amqp.impl.AmqpReplyToSinkStage$$anon$1$$anon$2
            private final /* synthetic */ AmqpReplyToSinkStage$$anon$1 $outer;
            private final Promise promise$1;

            public void onUpstreamFailure(Throwable th) {
                this.promise$1.failure(th);
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onUpstreamFinish() {
                this.promise$1.success(Done$.MODULE$);
                InHandler.onUpstreamFinish$(this);
            }

            public void onPush() {
                WriteMessage writeMessage = (WriteMessage) this.$outer.grab(this.$outer.akka$stream$alpakka$amqp$impl$AmqpReplyToSinkStage$$anon$$$outer().in());
                Option map = writeMessage.properties().map(basicProperties -> {
                    return basicProperties.getReplyTo();
                });
                if (map.isDefined()) {
                    this.$outer.channel().basicPublish((String) writeMessage.routingKey().getOrElse(() -> {
                        return "";
                    }), (String) map.get(), writeMessage.mandatory(), writeMessage.immediate(), (AMQP.BasicProperties) writeMessage.properties().orNull($less$colon$less$.MODULE$.refl()), (byte[]) writeMessage.bytes().toArray(ClassTag$.MODULE$.Byte()));
                } else if (this.$outer.settings().failIfReplyToMissing()) {
                    this.$outer.onFailure(new RuntimeException("Reply-to header was not set"));
                }
                this.$outer.tryPull(this.$outer.akka$stream$alpakka$amqp$impl$AmqpReplyToSinkStage$$anon$$$outer().in());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.promise$1 = promise;
                InHandler.$init$(this);
            }
        });
    }
}
