package com.netflix.spinnaker.kork.pubsub.aws;

import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.aws.ARN;
import com.netflix.spinnaker.kork.pubsub.aws.api.AmazonMessageAcknowledger;
import com.netflix.spinnaker.kork.pubsub.aws.api.AmazonPubsubMessageHandler;
import com.netflix.spinnaker.kork.pubsub.aws.config.AmazonPubsubConfig;
import com.netflix.spinnaker.kork.pubsub.aws.config.AmazonPubsubProperties;
import com.netflix.spinnaker.kork.pubsub.model.PubsubSubscriber;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/netflix/spinnaker/kork/pubsub/aws/SQSSubscriber.class */
public class SQSSubscriber implements Runnable, PubsubSubscriber {
    private static final Logger log = LoggerFactory.getLogger(SQSSubscriber.class);
    private final AmazonSNS amazonSNS;
    private final AmazonSQS amazonSQS;
    private final AmazonPubsubProperties.AmazonPubsubSubscription subscription;
    private final AmazonPubsubMessageHandler messageHandler;
    private final AmazonMessageAcknowledger messageAcknowledger;
    private final Registry registry;
    private final Supplier<Boolean> isEnabled;
    private final ARN queueARN;
    private final ARN topicARN;
    private AmazonSubscriptionInformation subscriptionInfo;

    public SQSSubscriber(AmazonPubsubProperties.AmazonPubsubSubscription amazonPubsubSubscription, AmazonPubsubMessageHandler amazonPubsubMessageHandler, AmazonMessageAcknowledger amazonMessageAcknowledger, AmazonSNS amazonSNS, AmazonSQS amazonSQS, Supplier<Boolean> supplier, Registry registry) {
        this.subscription = amazonPubsubSubscription;
        this.messageHandler = amazonPubsubMessageHandler;
        this.messageAcknowledger = amazonMessageAcknowledger;
        this.amazonSNS = amazonSNS;
        this.amazonSQS = amazonSQS;
        this.isEnabled = supplier;
        this.registry = registry;
        this.queueARN = new ARN(amazonPubsubSubscription.getQueueARN());
        this.topicARN = new ARN(amazonPubsubSubscription.getTopicARN());
    }

    public String getWorkerName() {
        return this.queueARN.getArn() + "/" + SQSSubscriber.class.getSimpleName();
    }

    public String getPubsubSystem() {
        return AmazonPubsubConfig.SYSTEM;
    }

    public String getSubscriptionName() {
        return this.subscription.getName();
    }

    public String getName() {
        return getSubscriptionName();
    }

    @Override // java.lang.Runnable
    public void run() {
        log.info("Starting {}", getWorkerName());
        try {
            initializeQueue();
            while (true) {
                try {
                    listenForMessages();
                } catch (QueueDoesNotExistException e) {
                    log.warn("Queue {} does not exist, recreating", this.queueARN, e);
                    initializeQueue();
                } catch (Exception e2) {
                    log.error("Unexpected error running {}, restarting worker", getWorkerName(), e2);
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e3) {
                        log.error("Thread {} interrupted while sleeping", getWorkerName(), e3);
                    }
                }
            }
        } catch (Exception e4) {
            log.error("Error initializing queue {}", this.queueARN, e4);
            throw e4;
        }
    }

    private void initializeQueue() {
        String ensureQueueExists = PubSubUtils.ensureQueueExists(this.amazonSQS, this.queueARN, this.topicARN, this.subscription.getSqsMessageRetentionPeriodSeconds());
        PubSubUtils.subscribeToTopic(this.amazonSNS, this.topicARN, this.queueARN);
        this.subscriptionInfo = AmazonSubscriptionInformation.builder().amazonSNS(this.amazonSNS).amazonSQS(this.amazonSQS).properties(this.subscription).queueUrl(ensureQueueExists).build();
    }

    private void listenForMessages() {
        while (this.isEnabled.get().booleanValue()) {
            ReceiveMessageResult receiveMessage = this.amazonSQS.receiveMessage(new ReceiveMessageRequest(this.subscriptionInfo.queueUrl).withMaxNumberOfMessages(Integer.valueOf(this.subscription.getMaxNumberOfMessages())).withVisibilityTimeout(Integer.valueOf(this.subscription.getVisibilityTimeout())).withWaitTimeSeconds(Integer.valueOf(this.subscription.getWaitTimeSeconds())).withMessageAttributeNames(new String[]{"All"}));
            if (receiveMessage.getMessages().isEmpty()) {
                log.debug("Received no messages for queue {}", this.queueARN);
            } else {
                receiveMessage.getMessages().forEach(this::handleMessage);
            }
        }
    }

    private void handleMessage(Message message) {
        Exception exc = null;
        try {
            this.messageHandler.handleMessage(message);
            getSuccessCounter().increment();
        } catch (Exception e) {
            log.error("failed to process message {}", message, e);
            getErrorCounter(e).increment();
            exc = e;
        }
        if (exc == null) {
            this.messageAcknowledger.ack(this.subscriptionInfo, message);
        } else {
            this.messageAcknowledger.nack(this.subscriptionInfo, message);
        }
    }

    private Counter getSuccessCounter() {
        return this.registry.counter("pubsub.amazon.processed", new String[]{"subscription", getSubscriptionName()});
    }

    private Counter getErrorCounter(Exception exc) {
        return this.registry.counter("pubsub.amazon.failed", new String[]{"subscription", getSubscriptionName(), "exceptionClass", exc.getClass().getSimpleName()});
    }
}
