package org.apache.hedwig.client.handlers;

import java.util.TimerTask;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.data.MessageConsumeData;
import org.apache.hedwig.client.netty.HChannelManager;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.VarArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/client/handlers/MessageConsumeCallback.class */
public class MessageConsumeCallback implements Callback<Void> {
    private static Logger logger = LoggerFactory.getLogger(MessageConsumeCallback.class);
    private final HChannelManager channelManager;
    private final long consumeRetryWaitTime;

    /* loaded from: input_file:org/apache/hedwig/client/handlers/MessageConsumeCallback$MessageConsumeRetryTask.class */
    class MessageConsumeRetryTask extends TimerTask {
        private final MessageConsumeData messageConsumeData;

        public MessageConsumeRetryTask(MessageConsumeData messageConsumeData) {
            this.messageConsumeData = messageConsumeData;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            SubscribeResponseHandler subscribeResponseHandler = MessageConsumeCallback.this.channelManager.getSubscribeResponseHandler(this.messageConsumeData.topicSubscriber);
            if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(this.messageConsumeData.topicSubscriber)) {
                MessageConsumeCallback.logger.warn("No subscription {} found to retry delivering message {}.", VarArgs.va(this.messageConsumeData.topicSubscriber, MessageIdUtils.msgIdToReadableString(this.messageConsumeData.msg.getMsgId())));
            } else {
                subscribeResponseHandler.asyncMessageDeliver(this.messageConsumeData.topicSubscriber, this.messageConsumeData.msg);
            }
        }
    }

    public MessageConsumeCallback(ClientConfiguration clientConfiguration, HChannelManager hChannelManager) {
        this.channelManager = hChannelManager;
        this.consumeRetryWaitTime = clientConfiguration.getMessageConsumeRetryWaitTime();
    }

    @Override // org.apache.hedwig.util.Callback
    public void operationFinished(Object obj, Void r9) {
        MessageConsumeData messageConsumeData = (MessageConsumeData) obj;
        SubscribeResponseHandler subscribeResponseHandler = this.channelManager.getSubscribeResponseHandler(messageConsumeData.topicSubscriber);
        if (null == subscribeResponseHandler || !subscribeResponseHandler.hasSubscription(messageConsumeData.topicSubscriber)) {
            logger.warn("No subscription {} found to consume message {}.", VarArgs.va(messageConsumeData.topicSubscriber, MessageIdUtils.msgIdToReadableString(messageConsumeData.msg.getMsgId())));
        } else {
            subscribeResponseHandler.messageConsumed(messageConsumeData.topicSubscriber, messageConsumeData.msg);
        }
    }

    @Override // org.apache.hedwig.util.Callback
    public void operationFailed(Object obj, PubSubException pubSubException) {
        MessageConsumeData messageConsumeData = (MessageConsumeData) obj;
        logger.error("Message was not consumed successfully by client MessageHandler: {}", messageConsumeData);
        this.channelManager.schedule(new MessageConsumeRetryTask(messageConsumeData), this.consumeRetryWaitTime);
    }
}
