package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/SubscriptionPartitionProcessorImpl.class */
class SubscriptionPartitionProcessorImpl implements SubscriptionPartitionProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(SubscriptionPartitionProcessorImpl.class);
    private final SubscriptionPartition subscriptionPartition;
    private final RestrictionTracker<OffsetByteRange, OffsetByteProgress> tracker;
    private final DoFn.OutputReceiver<SequencedMessage> receiver;
    private final MemoryBufferedSubscriber subscriber;
    private Optional<Offset> lastClaimedOffset = Optional.empty();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubscriptionPartitionProcessorImpl(SubscriptionPartition subscriptionPartition, RestrictionTracker<OffsetByteRange, OffsetByteProgress> restrictionTracker, DoFn.OutputReceiver<SequencedMessage> outputReceiver, Supplier<MemoryBufferedSubscriber> supplier) {
        this.subscriptionPartition = subscriptionPartition;
        this.tracker = restrictionTracker;
        this.receiver = outputReceiver;
        this.subscriber = getReadySubscriber(supplier);
    }

    /* JADX WARN: Code restructure failed: missing block: B:16:0x008f, code lost:
    
        r0 = new org.joda.time.Duration(org.joda.time.Instant.now(), r0);
        r0 = r7.subscriber.onData();
        org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(r0);
        r0.get(r0.getMillis(), java.util.concurrent.TimeUnit.MILLISECONDS);
     */
    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public org.apache.beam.sdk.transforms.DoFn.ProcessContinuation runFor(org.joda.time.Duration r8) {
        /*
            r7 = this;
            org.joda.time.Instant r0 = org.joda.time.Instant.now()
            r1 = r8
            org.joda.time.Instant r0 = r0.plus(r1)
            r9 = r0
        L8:
            r0 = r7
            org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber r0 = r0.subscriber
            boolean r0 = r0.isRunning()
            if (r0 == 0) goto Lcd
            r0 = r7
            org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber r0 = r0.subscriber
            java.util.Optional r0 = r0.peek()
            r10 = r0
        L1e:
            r0 = r10
            boolean r0 = r0.isPresent()
            if (r0 == 0) goto L8f
            r0 = r10
            java.lang.Object r0 = r0.get()
            com.google.cloud.pubsublite.proto.SequencedMessage r0 = (com.google.cloud.pubsublite.proto.SequencedMessage) r0
            r11 = r0
            r0 = r11
            com.google.cloud.pubsublite.proto.Cursor r0 = r0.getCursor()
            long r0 = r0.getOffset()
            com.google.cloud.pubsublite.Offset r0 = com.google.cloud.pubsublite.Offset.of(r0)
            r12 = r0
            r0 = r7
            org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker<org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteRange, org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress> r0 = r0.tracker
            r1 = r12
            r2 = r11
            long r2 = r2.getSizeBytes()
            org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress r1 = org.apache.beam.sdk.io.gcp.pubsublite.internal.OffsetByteProgress.of(r1, r2)
            boolean r0 = r0.tryClaim(r1)
            if (r0 == 0) goto L7e
            r0 = r7
            org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber r0 = r0.subscriber
            r0.pop()
            r0 = r7
            r1 = r12
            java.util.Optional r1 = java.util.Optional.of(r1)
            r0.lastClaimedOffset = r1
            r0 = r7
            org.apache.beam.sdk.transforms.DoFn$OutputReceiver<com.google.cloud.pubsublite.proto.SequencedMessage> r0 = r0.receiver
            r1 = r11
            org.joda.time.Instant r2 = new org.joda.time.Instant
            r3 = r2
            r4 = r11
            com.google.protobuf.Timestamp r4 = r4.getPublishTime()
            long r4 = com.google.protobuf.util.Timestamps.toMillis(r4)
            r3.<init>(r4)
            r0.outputWithTimestamp(r1, r2)
            goto L82
        L7e:
            org.apache.beam.sdk.transforms.DoFn$ProcessContinuation r0 = org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.stop()
            return r0
        L82:
            r0 = r7
            org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber r0 = r0.subscriber
            java.util.Optional r0 = r0.peek()
            r10 = r0
            goto L1e
        L8f:
            org.joda.time.Duration r0 = new org.joda.time.Duration     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            r1 = r0
            org.joda.time.Instant r2 = org.joda.time.Instant.now()     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            r3 = r9
            r1.<init>(r2, r3)     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            r10 = r0
            r0 = r7
            org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber r0 = r0.subscriber     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            com.google.api.core.ApiFuture r0 = r0.onData()     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            r11 = r0
            r0 = r11
            java.lang.Object r0 = org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(r0)     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            r0 = r11
            r1 = r10
            long r1 = r1.getMillis()     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            java.util.concurrent.TimeUnit r2 = java.util.concurrent.TimeUnit.MILLISECONDS     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            java.lang.Object r0 = r0.get(r1, r2)     // Catch: java.util.concurrent.TimeoutException -> Lbe java.lang.Throwable -> Lc3
            goto L8
        Lbe:
            r10 = move-exception
            org.apache.beam.sdk.transforms.DoFn$ProcessContinuation r0 = org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume()
            return r0
        Lc3:
            r10 = move-exception
            java.lang.RuntimeException r0 = new java.lang.RuntimeException
            r1 = r0
            r2 = r10
            r1.<init>(r2)
            throw r0
        Lcd:
            org.apache.beam.sdk.transforms.DoFn$ProcessContinuation r0 = org.apache.beam.sdk.transforms.DoFn.ProcessContinuation.resume()
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessorImpl.runFor(org.joda.time.Duration):org.apache.beam.sdk.transforms.DoFn$ProcessContinuation");
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.SubscriptionPartitionProcessor
    public Optional<Offset> lastClaimed() {
        return this.lastClaimedOffset;
    }

    private MemoryBufferedSubscriber getReadySubscriber(Supplier<MemoryBufferedSubscriber> supplier) {
        Offset of = Offset.of(((OffsetByteRange) this.tracker.currentRestriction()).getRange().getFrom());
        while (true) {
            MemoryBufferedSubscriber memoryBufferedSubscriber = supplier.get();
            Offset fetchOffset = memoryBufferedSubscriber.fetchOffset();
            if (of.equals(fetchOffset)) {
                memoryBufferedSubscriber.rebuffer();
                return memoryBufferedSubscriber;
            }
            LOG.info("Discarding subscriber due to mismatch, this should be rare. {}, start: {} fetch: {}", new Object[]{this.subscriptionPartition, of, fetchOffset});
            try {
                memoryBufferedSubscriber.stopAsync().awaitTerminated();
            } catch (Exception e) {
            }
        }
    }
}
