package com.azure.spring.messaging.storage.queue.core;

import com.azure.spring.messaging.checkpoint.AzureCheckpointer;
import com.azure.spring.messaging.core.SendOperation;
import com.azure.spring.messaging.storage.queue.core.factory.StorageQueueClientFactory;
import com.azure.spring.messaging.storage.queue.implementation.StorageQueueHelper;
import com.azure.spring.messaging.storage.queue.support.converter.StorageQueueMessageConverter;
import com.azure.storage.queue.QueueAsyncClient;
import com.azure.storage.queue.models.QueueMessageItem;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.NonNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/spring/messaging/storage/queue/core/StorageQueueTemplate.class */
public class StorageQueueTemplate implements SendOperation {
    private static final Logger LOG = LoggerFactory.getLogger(StorageQueueTemplate.class);
    private static final String MSG_FAIL_CHECKPOINT = "Failed to checkpoint %s in storage queue '%s'";
    private static final String MSG_SUCCESS_CHECKPOINT = "Checkpointed %s in storage queue '%s'";
    private final StorageQueueClientFactory storageQueueClientFactory;
    private StorageQueueMessageConverter messageConverter = new StorageQueueMessageConverter();
    private Class<?> messagePayloadType = byte[].class;

    public StorageQueueTemplate(@NonNull StorageQueueClientFactory storageQueueClientFactory) {
        this.storageQueueClientFactory = storageQueueClientFactory;
        LOG.info("StorageQueueTemplate started with default properties {}", buildProperties());
    }

    public <T> Mono<Void> sendAsync(String str, @NonNull Message<T> message) {
        Assert.hasText(str, "queueName can't be null or empty");
        QueueMessageItem queueMessageItem = (QueueMessageItem) this.messageConverter.fromMessage(message, QueueMessageItem.class);
        QueueAsyncClient createQueueClient = this.storageQueueClientFactory.createQueueClient(str);
        Assert.notNull(queueMessageItem, "queueMessageItem can't be null");
        return createQueueClient.sendMessage(queueMessageItem.getBody().toString()).then();
    }

    public Mono<Message<?>> receiveAsync(String str, Duration duration) {
        Assert.hasText(str, "queueName can't be null or empty");
        QueueAsyncClient createQueueClient = this.storageQueueClientFactory.createQueueClient(str);
        return createQueueClient.receiveMessages(1, duration).next().flatMap(queueMessageItem -> {
            HashMap hashMap = new HashMap();
            hashMap.put("azure_checkpointer", new AzureCheckpointer(() -> {
                return checkpoint(createQueueClient, queueMessageItem);
            }));
            return Mono.justOrEmpty(this.messageConverter.toMessage(queueMessageItem, new MessageHeaders(hashMap), this.messagePayloadType));
        });
    }

    private Mono<Void> checkpoint(QueueAsyncClient queueAsyncClient, QueueMessageItem queueMessageItem) {
        return queueAsyncClient.deleteMessage(queueMessageItem.getMessageId(), queueMessageItem.getPopReceipt()).doOnSuccess(r8 -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug(buildCheckpointSuccessMessage(queueMessageItem, queueAsyncClient.getQueueName()));
            }
        }).doOnError(th -> {
            if (LOG.isWarnEnabled()) {
                LOG.warn(buildCheckpointFailMessage(queueMessageItem, queueAsyncClient.getQueueName()), th);
            }
        });
    }

    private Map<String, Object> buildProperties() {
        HashMap hashMap = new HashMap();
        hashMap.put("messagePayloadType", this.messagePayloadType);
        return hashMap;
    }

    private String buildCheckpointFailMessage(QueueMessageItem queueMessageItem, String str) {
        return String.format(MSG_FAIL_CHECKPOINT, StorageQueueHelper.toString(queueMessageItem), str);
    }

    private String buildCheckpointSuccessMessage(QueueMessageItem queueMessageItem, String str) {
        return String.format(MSG_SUCCESS_CHECKPOINT, StorageQueueHelper.toString(queueMessageItem), str);
    }

    public StorageQueueMessageConverter getMessageConverter() {
        return this.messageConverter;
    }

    public void setMessageConverter(StorageQueueMessageConverter storageQueueMessageConverter) {
        this.messageConverter = storageQueueMessageConverter;
    }

    public Class<?> getMessagePayloadType() {
        return this.messagePayloadType;
    }

    public void setMessagePayloadType(Class<?> cls) {
        this.messagePayloadType = cls;
        LOG.info("StorageQueueTemplate messagePayloadType becomes: {}", this.messagePayloadType);
    }
}
