package org.springframework.cloud.gcp.pubsub.core.subscriber;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.protobuf.Empty;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.PubSubSubscriptionUtils;
import org.springframework.cloud.gcp.pubsub.support.SubscriberFactory;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage;
import org.springframework.cloud.gcp.pubsub.support.converter.PubSubMessageConverter;
import org.springframework.cloud.gcp.pubsub.support.converter.SimplePubSubMessageConverter;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;

/* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate.class */
public class PubSubSubscriberTemplate implements PubSubSubscriberOperations, DisposableBean {
    private final SubscriberFactory subscriberFactory;
    private final SubscriberStub subscriberStub;
    private PubSubMessageConverter pubSubMessageConverter = new SimplePubSubMessageConverter();
    private final ExecutorService defaultAckExecutor = Executors.newSingleThreadExecutor();
    private Executor ackExecutor = this.defaultAckExecutor;

    /* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate$AbstractBasicAcknowledgeablePubsubMessage.class */
    private static abstract class AbstractBasicAcknowledgeablePubsubMessage implements BasicAcknowledgeablePubsubMessage {
        private final ProjectSubscriptionName projectSubscriptionName;
        private final PubsubMessage message;

        AbstractBasicAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage pubsubMessage) {
            this.projectSubscriptionName = projectSubscriptionName;
            this.message = pubsubMessage;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage
        public ProjectSubscriptionName getProjectSubscriptionName() {
            return this.projectSubscriptionName;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage
        public PubsubMessage getPubsubMessage() {
            return this.message;
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate$ConvertedPulledAcknowledgeablePubsubMessage.class */
    private class ConvertedPulledAcknowledgeablePubsubMessage<T> extends PulledAcknowledgeablePubsubMessage implements ConvertedAcknowledgeablePubsubMessage<T> {
        private final T payload;

        ConvertedPulledAcknowledgeablePubsubMessage(AcknowledgeablePubsubMessage acknowledgeablePubsubMessage, T t) {
            super(acknowledgeablePubsubMessage.getProjectSubscriptionName(), acknowledgeablePubsubMessage.getPubsubMessage(), acknowledgeablePubsubMessage.getAckId());
            this.payload = t;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage
        public T getPayload() {
            return this.payload;
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate$ConvertedPushedAcknowledgeablePubsubMessage.class */
    private static class ConvertedPushedAcknowledgeablePubsubMessage<T> extends PushedAcknowledgeablePubsubMessage implements ConvertedBasicAcknowledgeablePubsubMessage<T> {
        private final T payload;

        ConvertedPushedAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage pubsubMessage, T t, AckReplyConsumer ackReplyConsumer) {
            super(projectSubscriptionName, pubsubMessage, ackReplyConsumer);
            this.payload = t;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.converter.ConvertedBasicAcknowledgeablePubsubMessage
        public T getPayload() {
            return this.payload;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate$PulledAcknowledgeablePubsubMessage.class */
    public class PulledAcknowledgeablePubsubMessage extends AbstractBasicAcknowledgeablePubsubMessage implements AcknowledgeablePubsubMessage {
        private final String ackId;

        PulledAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage pubsubMessage, String str) {
            super(projectSubscriptionName, pubsubMessage);
            this.ackId = str;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage
        public String getAckId() {
            return this.ackId;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage
        public ListenableFuture<Void> ack() {
            return PubSubSubscriberTemplate.this.ack(Collections.singleton(this));
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage
        public ListenableFuture<Void> nack() {
            return modifyAckDeadline(0);
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.AcknowledgeablePubsubMessage
        public ListenableFuture<Void> modifyAckDeadline(int i) {
            return PubSubSubscriberTemplate.this.modifyAckDeadline(Collections.singleton(this), i);
        }

        public String toString() {
            return "PulledAcknowledgeablePubsubMessage{projectId='" + getProjectSubscriptionName().getProject() + "', subscriptionName='" + getProjectSubscriptionName().getSubscription() + "', message=" + getPubsubMessage() + ", ackId='" + this.ackId + "'}";
        }
    }

    /* loaded from: input_file:org/springframework/cloud/gcp/pubsub/core/subscriber/PubSubSubscriberTemplate$PushedAcknowledgeablePubsubMessage.class */
    private static class PushedAcknowledgeablePubsubMessage extends AbstractBasicAcknowledgeablePubsubMessage {
        private final AckReplyConsumer ackReplyConsumer;

        PushedAcknowledgeablePubsubMessage(ProjectSubscriptionName projectSubscriptionName, PubsubMessage pubsubMessage, AckReplyConsumer ackReplyConsumer) {
            super(projectSubscriptionName, pubsubMessage);
            this.ackReplyConsumer = ackReplyConsumer;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage
        public ListenableFuture<Void> ack() {
            SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
            try {
                this.ackReplyConsumer.ack();
                settableListenableFuture.set((Object) null);
            } catch (Throwable th) {
                settableListenableFuture.setException(th);
            }
            return settableListenableFuture;
        }

        @Override // org.springframework.cloud.gcp.pubsub.support.BasicAcknowledgeablePubsubMessage
        public ListenableFuture<Void> nack() {
            SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
            try {
                this.ackReplyConsumer.nack();
                settableListenableFuture.set((Object) null);
            } catch (Throwable th) {
                settableListenableFuture.setException(th);
            }
            return settableListenableFuture;
        }

        public String toString() {
            return "PushedAcknowledgeablePubsubMessage{projectId='" + getProjectSubscriptionName().getProject() + "', subscriptionName='" + getProjectSubscriptionName().getSubscription() + "', message=" + getPubsubMessage() + '}';
        }
    }

    public PubSubSubscriberTemplate(SubscriberFactory subscriberFactory) {
        Assert.notNull(subscriberFactory, "The subscriberFactory can't be null.");
        this.subscriberFactory = subscriberFactory;
        this.subscriberStub = this.subscriberFactory.createSubscriberStub();
    }

    public PubSubMessageConverter getMessageConverter() {
        return this.pubSubMessageConverter;
    }

    public void setMessageConverter(PubSubMessageConverter pubSubMessageConverter) {
        Assert.notNull(pubSubMessageConverter, "The pubSubMessageConverter can't be null.");
        this.pubSubMessageConverter = pubSubMessageConverter;
    }

    public void setAckExecutor(Executor executor) {
        Assert.notNull(executor, "ackExecutor can't be null.");
        this.ackExecutor = executor;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    @Deprecated
    public Subscriber subscribe(String str, MessageReceiver messageReceiver) {
        Assert.hasText(str, "The subscription can't be null or empty.");
        Assert.notNull(messageReceiver, "The messageReceiver can't be null.");
        Subscriber createSubscriber = this.subscriberFactory.createSubscriber(str, messageReceiver);
        createSubscriber.startAsync();
        return createSubscriber;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public Subscriber subscribe(String str, Consumer<BasicAcknowledgeablePubsubMessage> consumer) {
        Assert.notNull(consumer, "The messageConsumer can't be null.");
        Subscriber createSubscriber = this.subscriberFactory.createSubscriber(str, (pubsubMessage, ackReplyConsumer) -> {
            consumer.accept(new PushedAcknowledgeablePubsubMessage(PubSubSubscriptionUtils.toProjectSubscriptionName(str, this.subscriberFactory.getProjectId()), pubsubMessage, ackReplyConsumer));
        });
        createSubscriber.startAsync();
        return createSubscriber;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public <T> Subscriber subscribeAndConvert(String str, Consumer<ConvertedBasicAcknowledgeablePubsubMessage<T>> consumer, Class<T> cls) {
        Assert.notNull(consumer, "The messageConsumer can't be null.");
        Subscriber createSubscriber = this.subscriberFactory.createSubscriber(str, (pubsubMessage, ackReplyConsumer) -> {
            consumer.accept(new ConvertedPushedAcknowledgeablePubsubMessage(PubSubSubscriptionUtils.toProjectSubscriptionName(str, this.subscriberFactory.getProjectId()), pubsubMessage, getMessageConverter().fromPubSubMessage(pubsubMessage, cls), ackReplyConsumer));
        });
        createSubscriber.startAsync();
        return createSubscriber;
    }

    private List<AcknowledgeablePubsubMessage> pull(PullRequest pullRequest) {
        Assert.notNull(pullRequest, "The pull request can't be null.");
        return (List) ((PullResponse) this.subscriberStub.pullCallable().call(pullRequest)).getReceivedMessagesList().stream().map(receivedMessage -> {
            return new PulledAcknowledgeablePubsubMessage(PubSubSubscriptionUtils.toProjectSubscriptionName(pullRequest.getSubscription(), this.subscriberFactory.getProjectId()), receivedMessage.getMessage(), receivedMessage.getAckId());
        }).collect(Collectors.toList());
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public List<AcknowledgeablePubsubMessage> pull(String str, Integer num, Boolean bool) {
        return pull(this.subscriberFactory.createPullRequest(str, num, bool));
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public <T> List<ConvertedAcknowledgeablePubsubMessage<T>> pullAndConvert(String str, Integer num, Boolean bool, Class<T> cls) {
        return (List) pull(str, num, bool).stream().map(acknowledgeablePubsubMessage -> {
            return new ConvertedPulledAcknowledgeablePubsubMessage(acknowledgeablePubsubMessage, this.pubSubMessageConverter.fromPubSubMessage(acknowledgeablePubsubMessage.getPubsubMessage(), cls));
        }).collect(Collectors.toList());
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public List<PubsubMessage> pullAndAck(String str, Integer num, Boolean bool) {
        Assert.hasText(str, "The subscription can't be null or empty.");
        if (num != null) {
            Assert.isTrue(num.intValue() > 0, "The maxMessages must be greater than 0.");
        }
        List<AcknowledgeablePubsubMessage> pull = pull(this.subscriberFactory.createPullRequest(str, num, bool));
        if (pull.size() > 0) {
            ack(pull);
        }
        return (List) pull.stream().map((v0) -> {
            return v0.getPubsubMessage();
        }).collect(Collectors.toList());
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public PubsubMessage pullNext(String str) {
        List<PubsubMessage> pullAndAck = pullAndAck(str, 1, true);
        if (pullAndAck.size() > 0) {
            return pullAndAck.get(0);
        }
        return null;
    }

    public SubscriberFactory getSubscriberFactory() {
        return this.subscriberFactory;
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public ListenableFuture<Void> ack(Collection<AcknowledgeablePubsubMessage> collection) {
        Assert.notEmpty(collection, "The acknowledgeablePubsubMessages can't be empty.");
        return doBatchedAsyncOperation(collection, (v1, v2) -> {
            return ack(v1, v2);
        });
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public ListenableFuture<Void> nack(Collection<AcknowledgeablePubsubMessage> collection) {
        return modifyAckDeadline(collection, 0);
    }

    @Override // org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberOperations
    public ListenableFuture<Void> modifyAckDeadline(Collection<AcknowledgeablePubsubMessage> collection, int i) {
        Assert.notEmpty(collection, "The acknowledgeablePubsubMessages can't be empty.");
        Assert.isTrue(i >= 0, "The ackDeadlineSeconds must not be negative.");
        return doBatchedAsyncOperation(collection, (str, list) -> {
            return modifyAckDeadline(str, list, i);
        });
    }

    public void destroy() {
        this.defaultAckExecutor.shutdown();
        this.subscriberStub.close();
    }

    private ApiFuture<Empty> ack(String str, Collection<String> collection) {
        return this.subscriberStub.acknowledgeCallable().futureCall(AcknowledgeRequest.newBuilder().addAllAckIds(collection).setSubscription(str).build());
    }

    private ApiFuture<Empty> modifyAckDeadline(String str, Collection<String> collection, int i) {
        return this.subscriberStub.modifyAckDeadlineCallable().futureCall(ModifyAckDeadlineRequest.newBuilder().setAckDeadlineSeconds(i).addAllAckIds(collection).setSubscription(str).build());
    }

    private ListenableFuture<Void> doBatchedAsyncOperation(Collection<AcknowledgeablePubsubMessage> collection, BiFunction<String, List<String>, ApiFuture<Empty>> biFunction) {
        Map map = (Map) collection.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getProjectSubscriptionName();
        }, Collectors.mapping((v0) -> {
            return v0.getAckId();
        }, Collectors.toList())));
        Assert.state(map.keySet().stream().map((v0) -> {
            return v0.getProject();
        }).distinct().count() == 1, "The project id of all messages must match.");
        SettableListenableFuture settableListenableFuture = new SettableListenableFuture();
        int size = map.size();
        AtomicInteger atomicInteger = new AtomicInteger();
        map.forEach((projectSubscriptionName, list) -> {
            ApiFutures.addCallback((ApiFuture) biFunction.apply(projectSubscriptionName.toString(), list), new ApiFutureCallback<Empty>() { // from class: org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.1
                public void onFailure(Throwable th) {
                    processResult(th);
                }

                public void onSuccess(Empty empty) {
                    processResult(null);
                }

                private void processResult(Throwable th) {
                    if (th != null) {
                        settableListenableFuture.setException(th);
                    } else if (atomicInteger.incrementAndGet() == size) {
                        settableListenableFuture.set((Object) null);
                    }
                }
            }, this.ackExecutor);
        });
        return settableListenableFuture;
    }
}
