package org.apache.hedwig.client.netty;

import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.handlers.PubSubCallback;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/netty/HedwigPublisher.class */
public class HedwigPublisher implements Publisher {
    private static Logger logger = LoggerFactory.getLogger(HedwigPublisher.class);
    private final HChannelManager channelManager;

    /* loaded from: input_file:org/apache/hedwig/client/netty/HedwigPublisher$PublishResponseCallbackAdapter.class */
    private static class PublishResponseCallbackAdapter implements Callback<PubSubProtocol.ResponseBody> {
        private final Callback<PubSubProtocol.PublishResponse> delegate;

        private PublishResponseCallbackAdapter(Callback<PubSubProtocol.PublishResponse> callback) {
            this.delegate = callback;
        }

        @Override // org.apache.hedwig.util.Callback
        public void operationFinished(Object obj, PubSubProtocol.ResponseBody responseBody) {
            if (null == responseBody) {
                this.delegate.operationFinished(obj, null);
            } else {
                this.delegate.operationFinished(obj, responseBody.getPublishResponse());
            }
        }

        @Override // org.apache.hedwig.util.Callback
        public void operationFailed(Object obj, PubSubException pubSubException) {
            this.delegate.operationFailed(obj, pubSubException);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public HedwigPublisher(HedwigClientImpl hedwigClientImpl) {
        this.channelManager = hedwigClientImpl.getHChannelManager();
    }

    @Override // org.apache.hedwig.client.api.Publisher
    public PubSubProtocol.PublishResponse publish(ByteString byteString, PubSubProtocol.Message message) throws PubSubException.CouldNotConnectException, PubSubException.ServiceDownException {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling a sync publish for topic: {}, msg: {}.", byteString.toStringUtf8(), message);
        }
        PubSubData pubSubData = new PubSubData(byteString, message, null, PubSubProtocol.OperationType.PUBLISH, null, null, null);
        synchronized (pubSubData) {
            PubSubCallback pubSubCallback = new PubSubCallback(pubSubData);
            asyncPublishWithResponseImpl(byteString, message, pubSubCallback, null);
            while (!pubSubData.isDone) {
                try {
                    pubSubData.wait();
                } catch (InterruptedException e) {
                    throw new PubSubException.ServiceDownException("Interrupted Exception while waiting for async publish call");
                }
            }
            if (pubSubCallback.getIsCallSuccessful()) {
                PubSubProtocol.ResponseBody responseBody = pubSubCallback.getResponseBody();
                if (null == responseBody) {
                    return null;
                }
                return responseBody.hasPublishResponse() ? responseBody.getPublishResponse() : null;
            }
            PubSubException.CouldNotConnectException failureException = pubSubCallback.getFailureException();
            if (failureException == null) {
                logger.error("Sync Publish operation failed but no PubSubException was passed!");
                throw new PubSubException.ServiceDownException("Server ack response to publish request is not successful");
            }
            if (failureException instanceof PubSubException.CouldNotConnectException) {
                throw failureException;
            }
            if (failureException instanceof PubSubException.ServiceDownException) {
                throw ((PubSubException.ServiceDownException) failureException);
            }
            logger.error("Unexpected exception type when a sync publish operation failed: ", failureException);
            throw new PubSubException.ServiceDownException("Server ack response to publish request is not successful");
        }
    }

    @Override // org.apache.hedwig.client.api.Publisher
    public void asyncPublish(ByteString byteString, PubSubProtocol.Message message, Callback<Void> callback, Object obj) {
        asyncPublishWithResponseImpl(byteString, message, new VoidCallbackAdapter(callback), obj);
    }

    @Override // org.apache.hedwig.client.api.Publisher
    public void asyncPublishWithResponse(ByteString byteString, PubSubProtocol.Message message, Callback<PubSubProtocol.PublishResponse> callback, Object obj) {
        asyncPublishWithResponseImpl(byteString, message, new PublishResponseCallbackAdapter(callback), obj);
    }

    private void asyncPublishWithResponseImpl(ByteString byteString, PubSubProtocol.Message message, Callback<PubSubProtocol.ResponseBody> callback, Object obj) {
        if (logger.isDebugEnabled()) {
            logger.debug("Calling an async publish for topic: {}, msg: {}.", byteString.toStringUtf8(), message);
        }
        this.channelManager.submitOp(new PubSubData(byteString, message, null, PubSubProtocol.OperationType.PUBLISH, null, callback, obj));
    }
}
