package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/internal/MemoryBufferedSubscriberImpl.class */
class MemoryBufferedSubscriberImpl extends ProxyService implements MemoryBufferedSubscriber {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryBufferedSubscriberImpl.class);
    private final Partition partition;
    private final MemoryLimiter limiter;
    private final Subscriber subscriber;
    private long targetMemory;
    private Offset fetchOffset;
    private MemoryLimiter.Block memBlock;
    private long bytesOutstandingToServer;
    private long bytesOutstanding;
    private final Queue<SequencedMessage> messages;
    private SettableApiFuture<Void> newData;
    private boolean shutdown;

    public MemoryBufferedSubscriberImpl(Partition partition, Offset offset, MemoryLimiter memoryLimiter, Function<Consumer<List<SequencedMessage>>, Subscriber> function) {
        super(new ApiService[0]);
        this.bytesOutstandingToServer = 0L;
        this.bytesOutstanding = 0L;
        this.messages = new ArrayDeque();
        this.newData = SettableApiFuture.create();
        this.shutdown = false;
        this.partition = partition;
        this.fetchOffset = offset;
        this.limiter = memoryLimiter;
        this.targetMemory = memoryLimiter.maxBlockSize();
        this.subscriber = function.apply(this::onReceive);
        addServices(new ApiService[]{this.subscriber});
        this.memBlock = memoryLimiter.claim(this.targetMemory);
    }

    protected synchronized void start() throws CheckedApiException {
        this.bytesOutstandingToServer += this.memBlock.claimed();
        this.bytesOutstanding += this.memBlock.claimed();
        this.subscriber.allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(this.memBlock.claimed()).setAllowedMessages(Long.MAX_VALUE).build());
    }

    protected synchronized void stop() {
        if (this.shutdown) {
            return;
        }
        this.shutdown = true;
        this.newData.set((Object) null);
        this.memBlock.close();
    }

    protected synchronized void handlePermanentError(CheckedApiException checkedApiException) {
        stop();
    }

    private synchronized void onReceive(List<SequencedMessage> list) {
        if (this.shutdown) {
            return;
        }
        Iterator<SequencedMessage> it = list.iterator();
        while (it.hasNext()) {
            this.bytesOutstandingToServer -= it.next().getSizeBytes();
        }
        this.messages.addAll(list);
        this.newData.set((Object) null);
        this.newData = SettableApiFuture.create();
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber
    public synchronized Offset fetchOffset() {
        return this.fetchOffset;
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber
    public synchronized void rebuffer() throws ApiException {
        if (this.shutdown) {
            return;
        }
        if (this.bytesOutstandingToServer < this.targetMemory / 3) {
            this.targetMemory = Math.min(this.limiter.maxBlockSize(), this.targetMemory * 2);
        } else if (this.bytesOutstandingToServer > (2 * this.targetMemory) / 3) {
            this.targetMemory = Math.max(this.limiter.minBlockSize(), this.targetMemory / 2);
        }
        long max = Math.max(this.bytesOutstanding, this.targetMemory);
        this.memBlock.close();
        this.memBlock = this.limiter.claim(max);
        long max2 = Math.max(this.memBlock.claimed() - this.bytesOutstanding, 0L);
        if (max2 <= 0) {
            LOG.debug("Not claiming memory: partition {} outstanding {} to server {} target {} claimed {} messages {}", new Object[]{this.partition, Long.valueOf(this.bytesOutstanding), Long.valueOf(this.bytesOutstandingToServer), Long.valueOf(this.targetMemory), Long.valueOf(this.memBlock.claimed()), Integer.valueOf(this.messages.size())});
            return;
        }
        this.bytesOutstanding += max2;
        this.bytesOutstandingToServer += max2;
        try {
            this.subscriber.allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(max2).build());
        } catch (CheckedApiException e) {
            throw e.underlying;
        }
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber
    public synchronized Optional<SequencedMessage> peek() {
        return Optional.ofNullable(this.messages.peek());
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber
    public synchronized void pop() {
        SequencedMessage remove = this.messages.remove();
        this.bytesOutstanding -= remove.getSizeBytes();
        this.fetchOffset = Offset.of(remove.getCursor().getOffset() + 1);
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.internal.MemoryBufferedSubscriber
    public synchronized ApiFuture<Void> onData() {
        return (this.shutdown || !this.messages.isEmpty()) ? ApiFutures.immediateFuture((Object) null) : this.newData;
    }
}
