package org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader;

import java.io.Closeable;
import java.io.IOException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.seatunnel.common.Handover;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConsumerConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.start.StartCursor;
import org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/pulsar/source/reader/PulsarSplitReaderThread.class */
public class PulsarSplitReaderThread extends Thread implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSplitReaderThread.class);
    protected final PulsarSourceReader sourceReader;
    protected final PulsarPartitionSplit split;
    protected final PulsarClient pulsarClient;
    protected final PulsarConsumerConfig consumerConfig;
    protected final int pollTimeout;
    protected final long pollInterval;
    protected final StartCursor startCursor;
    protected final Handover<RecordWithSplitId> handover;
    protected Consumer<byte[]> consumer;
    private volatile boolean running;

    public PulsarSplitReaderThread(PulsarSourceReader pulsarSourceReader, PulsarPartitionSplit pulsarPartitionSplit, PulsarClient pulsarClient, PulsarConsumerConfig pulsarConsumerConfig, int i, long j, StartCursor startCursor, Handover<RecordWithSplitId> handover) {
        this.sourceReader = pulsarSourceReader;
        this.split = pulsarPartitionSplit;
        this.pulsarClient = pulsarClient;
        this.consumerConfig = pulsarConsumerConfig;
        this.pollTimeout = i;
        this.pollInterval = j;
        this.startCursor = startCursor;
        this.handover = handover;
    }

    public void open() throws PulsarClientException {
        this.consumer = createPulsarConsumer(this.split);
        if (this.split.getLatestConsumedId() == null) {
            this.startCursor.seekPosition(this.consumer);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0044, code lost:
    
        r6.sourceReader.handleNoMoreElements(r6.split.splitId(), r0.getMessageId());
     */
    @Override // java.lang.Thread, java.lang.Runnable
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            r6 = this;
            r0 = r6
            org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit r0 = r0.split     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            org.apache.seatunnel.connectors.seatunnel.pulsar.source.enumerator.cursor.stop.StopCursor r0 = r0.getStopCursor()     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r7 = r0
        L8:
            r0 = r6
            boolean r0 = r0.running     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            if (r0 == 0) goto L65
            r0 = r6
            org.apache.pulsar.client.api.Consumer<byte[]> r0 = r0.consumer     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r1 = r6
            int r1 = r1.pollTimeout     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.MILLISECONDS     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            org.apache.pulsar.client.api.Message r0 = r0.receive(r1, r2)     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r8 = r0
            r0 = r8
            if (r0 == 0) goto L5b
            r0 = r6
            org.apache.seatunnel.common.Handover<org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.RecordWithSplitId> r0 = r0.handover     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.RecordWithSplitId r1 = new org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.RecordWithSplitId     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r2 = r1
            r3 = r8
            r4 = r6
            org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit r4 = r4.split     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            java.lang.String r4 = r4.splitId()     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r2.<init>(r3, r4)     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r0.produce(r1)     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r0 = r7
            r1 = r8
            boolean r0 = r0.shouldStop(r1)     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            if (r0 == 0) goto L5b
            r0 = r6
            org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSourceReader r0 = r0.sourceReader     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r1 = r6
            org.apache.seatunnel.connectors.seatunnel.pulsar.source.split.PulsarPartitionSplit r1 = r1.split     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            java.lang.String r1 = r1.splitId()     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r2 = r8
            org.apache.pulsar.client.api.MessageId r2 = r2.getMessageId()     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            r0.handleNoMoreElements(r1, r2)     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            goto L65
        L5b:
            r0 = r6
            long r0 = r0.pollInterval     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            java.lang.Thread.sleep(r0)     // Catch: java.lang.Throwable -> L80 java.lang.Throwable -> La4
            goto L8
        L65:
            r0 = r6
            org.apache.pulsar.client.api.Consumer<byte[]> r0 = r0.consumer     // Catch: java.lang.Throwable -> L71
            r0.close()     // Catch: java.lang.Throwable -> L71
            goto Lc1
        L71:
            r7 = move-exception
            org.slf4j.Logger r0 = org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.LOG
            java.lang.String r1 = "Error while closing pulsar consumer"
            r2 = r7
            r0.warn(r1, r2)
            goto Lc1
        L80:
            r7 = move-exception
            r0 = r6
            org.apache.seatunnel.common.Handover<org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.RecordWithSplitId> r0 = r0.handover     // Catch: java.lang.Throwable -> La4
            r1 = r7
            r0.reportError(r1)     // Catch: java.lang.Throwable -> La4
            r0 = r6
            org.apache.pulsar.client.api.Consumer<byte[]> r0 = r0.consumer     // Catch: java.lang.Throwable -> L95
            r0.close()     // Catch: java.lang.Throwable -> L95
            goto Lc1
        L95:
            r7 = move-exception
            org.slf4j.Logger r0 = org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.LOG
            java.lang.String r1 = "Error while closing pulsar consumer"
            r2 = r7
            r0.warn(r1, r2)
            goto Lc1
        La4:
            r9 = move-exception
            r0 = r6
            org.apache.pulsar.client.api.Consumer<byte[]> r0 = r0.consumer     // Catch: java.lang.Throwable -> Lb1
            r0.close()     // Catch: java.lang.Throwable -> Lb1
            goto Lbf
        Lb1:
            r10 = move-exception
            org.slf4j.Logger r0 = org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.LOG
            java.lang.String r1 = "Error while closing pulsar consumer"
            r2 = r10
            r0.warn(r1, r2)
        Lbf:
            r0 = r9
            throw r0
        Lc1:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.seatunnel.connectors.seatunnel.pulsar.source.reader.PulsarSplitReaderThread.run():void");
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    public void committingCursor(MessageId messageId) throws PulsarClientException {
        if (this.consumer == null) {
            this.consumer = createPulsarConsumer(this.split);
        }
        this.consumer.acknowledgeCumulative(messageId);
    }

    protected Consumer<byte[]> createPulsarConsumer(PulsarPartitionSplit pulsarPartitionSplit) {
        ConsumerBuilder<byte[]> createConsumerBuilder = PulsarConfigUtil.createConsumerBuilder(this.pulsarClient, this.consumerConfig);
        createConsumerBuilder.topic(pulsarPartitionSplit.getPartition().getFullTopicName());
        try {
            return createConsumerBuilder.subscribe();
        } catch (PulsarClientException e) {
            throw new PulsarConnectorException(PulsarConnectorErrorCode.OPEN_PULSAR_ADMIN_FAILED, "Failed to create pulsar consumer:", e);
        }
    }
}
