package com.alibaba.otter.canal.client.pulsarmq;

import com.alibaba.fastjson2.JSON;
import com.alibaba.google.common.collect.Lists;
import com.alibaba.otter.canal.client.CanalMQConnector;
import com.alibaba.otter.canal.client.CanalMessageDeserializer;
import com.alibaba.otter.canal.common.utils.MQUtil;
import com.alibaba.otter.canal.protocol.FlatMessage;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/otter/canal/client/pulsarmq/PulsarMQCanalConnector.class */
public class PulsarMQCanalConnector implements CanalMQConnector {
    private static final Logger logger = LoggerFactory.getLogger(PulsarMQCanalConnector.class);
    private volatile Messages<byte[]> lastGetBatchMessage;
    private PulsarClient pulsarClient;
    private Consumer<byte[]> consumer;
    private boolean isFlatMessage;
    private String topic;
    private String serviceUrl;
    private String roleToken;
    private String subscriptName;
    private int batchSize;
    private int getBatchTimeoutSeconds;
    private int batchProcessTimeoutSeconds;
    private int redeliveryDelaySeconds;
    private int ackTimeoutSeconds;
    private boolean isRetry;
    private boolean isRetryDLQUpperCase;
    private int maxRedeliveryCount;
    private boolean connected;

    public PulsarMQCanalConnector(boolean z, String str, String str2, String str3, String str4) {
        this.isFlatMessage = false;
        this.batchSize = 30;
        this.getBatchTimeoutSeconds = 30;
        this.batchProcessTimeoutSeconds = 60;
        this.redeliveryDelaySeconds = 60;
        this.ackTimeoutSeconds = 30;
        this.isRetry = true;
        this.isRetryDLQUpperCase = false;
        this.maxRedeliveryCount = 128;
        this.connected = false;
        this.isFlatMessage = z;
        this.serviceUrl = str;
        this.roleToken = str2;
        this.topic = str3;
        this.subscriptName = str4;
        if (StringUtils.isEmpty(this.subscriptName)) {
            throw new RuntimeException("Pulsar Consumer subscriptName required");
        }
    }

    public PulsarMQCanalConnector(boolean z, String str, String str2, String str3, String str4, int i, int i2, int i3, int i4, int i5, boolean z2, boolean z3, int i6) {
        this.isFlatMessage = false;
        this.batchSize = 30;
        this.getBatchTimeoutSeconds = 30;
        this.batchProcessTimeoutSeconds = 60;
        this.redeliveryDelaySeconds = 60;
        this.ackTimeoutSeconds = 30;
        this.isRetry = true;
        this.isRetryDLQUpperCase = false;
        this.maxRedeliveryCount = 128;
        this.connected = false;
        this.isFlatMessage = z;
        this.serviceUrl = str;
        this.roleToken = str2;
        this.topic = str3;
        this.subscriptName = str4;
        if (StringUtils.isEmpty(this.subscriptName)) {
            throw new RuntimeException("Pulsar Consumer subscriptName required");
        }
        this.batchSize = i;
        this.getBatchTimeoutSeconds = i2;
        this.batchProcessTimeoutSeconds = i3;
        this.redeliveryDelaySeconds = i4;
        this.ackTimeoutSeconds = i5;
        this.isRetry = z2;
        this.isRetryDLQUpperCase = z3;
        this.maxRedeliveryCount = i6;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void connect() throws CanalClientException {
        try {
            this.pulsarClient = PulsarClient.builder().serviceUrl(this.serviceUrl).authentication(AuthenticationFactory.token(this.roleToken)).build();
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void disconnect() throws CanalClientException {
        try {
            if (null != this.consumer && this.consumer.isConnected()) {
                this.consumer.close();
            }
        } catch (PulsarClientException e) {
            logger.error("close pulsar consumer error", e);
        }
        try {
            if (null != this.pulsarClient) {
                this.pulsarClient.close();
            }
        } catch (PulsarClientException e2) {
            logger.error("close pulsar client error", e2);
        }
        this.connected = false;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public boolean checkValid() throws CanalClientException {
        return this.connected;
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public synchronized void subscribe(String str) throws CanalClientException {
        if (this.connected) {
            return;
        }
        ConsumerBuilder newConsumer = this.pulsarClient.newConsumer();
        if (MQUtil.isPatternTopic(this.topic)) {
            newConsumer.topicsPattern(this.topic);
        } else {
            newConsumer.topic(new String[]{this.topic});
        }
        newConsumer.subscriptionType(SubscriptionType.Failover);
        newConsumer.negativeAckRedeliveryDelay(this.redeliveryDelaySeconds, TimeUnit.SECONDS).subscriptionName(this.subscriptName);
        if (this.isRetry) {
            DeadLetterPolicy.DeadLetterPolicyBuilder maxRedeliverCount = DeadLetterPolicy.builder().maxRedeliverCount(this.maxRedeliveryCount);
            if (!MQUtil.isPatternTag(this.topic)) {
                maxRedeliverCount.retryLetterTopic(this.topic + (this.isRetryDLQUpperCase ? "-RETRY" : "-retry"));
                maxRedeliverCount.deadLetterTopic(this.topic + (this.isRetryDLQUpperCase ? "-DLQ" : "-dlq"));
            }
            newConsumer.enableRetry(true).deadLetterPolicy(maxRedeliverCount.build());
        }
        newConsumer.ackTimeout(this.ackTimeoutSeconds, TimeUnit.SECONDS);
        newConsumer.batchReceivePolicy(new BatchReceivePolicy.Builder().maxNumMessages(this.batchSize).timeout(this.getBatchTimeoutSeconds, TimeUnit.SECONDS).build());
        try {
            this.consumer = newConsumer.subscribe();
            this.connected = true;
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void subscribe() throws CanalClientException {
        subscribe(null);
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void unsubscribe() throws CanalClientException {
        try {
            if (null != this.consumer) {
                this.consumer.unsubscribe();
            }
        } catch (PulsarClientException e) {
            throw new CanalClientException(e.getMessage(), e);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<Message> getList(Long l, TimeUnit timeUnit) throws CanalClientException {
        List<Message> listWithoutAck = getListWithoutAck(l, timeUnit);
        if (listWithoutAck != null && !listWithoutAck.isEmpty()) {
            ack();
        }
        return listWithoutAck;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<Message> getListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        return getListWithoutAck();
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<FlatMessage> getFlatList(Long l, TimeUnit timeUnit) throws CanalClientException {
        List<FlatMessage> flatListWithoutAck = getFlatListWithoutAck(l, timeUnit);
        if (flatListWithoutAck != null && !flatListWithoutAck.isEmpty()) {
            ack();
        }
        return flatListWithoutAck;
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public List<FlatMessage> getFlatListWithoutAck(Long l, TimeUnit timeUnit) throws CanalClientException {
        return getListWithoutAck();
    }

    private <T> List<T> getListWithoutAck() {
        if (null != this.lastGetBatchMessage) {
            throw new CanalClientException("mq get/ack not support concurrent & async ack");
        }
        ArrayList newArrayList = Lists.newArrayList();
        try {
            this.lastGetBatchMessage = this.consumer.batchReceive();
            if (null == this.lastGetBatchMessage || this.lastGetBatchMessage.size() < 1) {
                this.lastGetBatchMessage = null;
                return newArrayList;
            }
            Iterator it = this.lastGetBatchMessage.iterator();
            while (it.hasNext()) {
                byte[] data = ((org.apache.pulsar.client.api.Message) it.next()).getData();
                if (data == null) {
                    logger.warn("Received message data is null");
                } else {
                    try {
                        if (this.isFlatMessage) {
                            newArrayList.add((FlatMessage) JSON.parseObject(data, FlatMessage.class));
                        } else {
                            newArrayList.add(CanalMessageDeserializer.deserializer(data));
                        }
                    } catch (Exception e) {
                        logger.error("Add message error", e);
                        throw new CanalClientException(e);
                    }
                }
            }
            return newArrayList;
        } catch (PulsarClientException e2) {
            logger.error("Receiver Pulsar MQ message error", e2);
            throw new CanalClientException(e2);
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector
    public void ack() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.consumer.acknowledge(this.lastGetBatchMessage);
            }
        } catch (Throwable th) {
            if (this.lastGetBatchMessage != null) {
                this.consumer.negativeAcknowledge(this.lastGetBatchMessage);
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalMQConnector, com.alibaba.otter.canal.client.CanalConnector
    public void rollback() throws CanalClientException {
        try {
            if (this.lastGetBatchMessage != null) {
                this.consumer.negativeAcknowledge(this.lastGetBatchMessage);
            }
        } finally {
            this.lastGetBatchMessage = null;
        }
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message get(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message get(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message getWithoutAck(int i) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public Message getWithoutAck(int i, Long l, TimeUnit timeUnit) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void ack(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }

    @Override // com.alibaba.otter.canal.client.CanalConnector
    public void rollback(long j) throws CanalClientException {
        throw new CanalClientException("mq not support this method");
    }
}
