package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.SequencedPublisher;
import com.google.cloud.pubsublite.internal.wire.BatchPublisherImpl;
import com.google.cloud.pubsublite.internal.wire.SerialBatcher;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.PublishRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Monitor;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PublisherImpl.class */
public final class PublisherImpl extends ProxyService implements SequencedPublisher<Offset>, RetryingConnectionObserver<MessagePublishResponse> {
    private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
    private final AlarmFactory alarmFactory;
    private final PublishRequest initialRequest;
    private final CloseableMonitor monitor;
    private final Monitor.Guard noneInFlight;

    @GuardedBy("monitor.monitor")
    private Optional<Future<?>> alarmFuture;

    @GuardedBy("monitor.monitor")
    private final RetryingConnection<PublishRequest, BatchPublisher> connection;

    @GuardedBy("monitor.monitor")
    private boolean shutdown;

    @GuardedBy("monitor.monitor")
    private Offset lastSentOffset;
    private final CloseableMonitor batcherMonitor;

    @GuardedBy("batcherMonitor.monitor")
    private final SerialBatcher batcher;

    @GuardedBy("monitor.monitor")
    private final Queue<InFlightBatch> batchesInFlight;
    private final CloseableMonitor reconnectingMonitor;

    @GuardedBy("reconnectingMonitor.monitor")
    private boolean reconnecting;

    /* renamed from: com.google.cloud.pubsublite.internal.wire.PublisherImpl$2, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PublisherImpl$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$google$api$core$ApiService$State = new int[ApiService.State.values().length];

        static {
            try {
                $SwitchMap$com$google$api$core$ApiService$State[ApiService.State.FAILED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$api$core$ApiService$State[ApiService.State.STARTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$api$core$ApiService$State[ApiService.State.RUNNING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/PublisherImpl$InFlightBatch.class */
    public static class InFlightBatch {
        final List<SerialBatcher.UnbatchedMessage> messages;

        InFlightBatch(List<SerialBatcher.UnbatchedMessage> list) {
            this.messages = list;
        }

        List<PubSubMessage> messagesToSend() {
            return (List) this.messages.stream().map((v0) -> {
                return v0.message();
            }).collect(ImmutableList.toImmutableList());
        }

        PublishSequenceNumber firstSequenceNumber() {
            return this.messages.get(0).sequenceNumber();
        }

        void failBatch(int i, CheckedApiException checkedApiException) {
            for (int i2 = i; i2 < this.messages.size(); i2++) {
                this.messages.get(i2).future().setException(checkedApiException);
            }
        }
    }

    @VisibleForTesting
    PublisherImpl(StreamFactories.PublishStreamFactory publishStreamFactory, BatchPublisherFactory batchPublisherFactory, AlarmFactory alarmFactory, InitialPublishRequest initialPublishRequest, BatchingSettings batchingSettings) throws ApiException {
        super(new ApiService[0]);
        this.monitor = new CloseableMonitor();
        this.noneInFlight = new Monitor.Guard(this.monitor.monitor) { // from class: com.google.cloud.pubsublite.internal.wire.PublisherImpl.1
            public boolean isSatisfied() {
                return PublisherImpl.this.batchesInFlight.isEmpty() || PublisherImpl.this.shutdown;
            }
        };
        this.alarmFuture = Optional.empty();
        this.shutdown = false;
        this.lastSentOffset = Offset.of(-1L);
        this.batcherMonitor = new CloseableMonitor();
        this.batchesInFlight = new ArrayDeque();
        this.reconnectingMonitor = new CloseableMonitor();
        this.reconnecting = false;
        this.alarmFactory = alarmFactory;
        Preconditions.checkNotNull(batchingSettings.getRequestByteThreshold());
        Preconditions.checkNotNull(batchingSettings.getElementCountThreshold());
        this.initialRequest = PublishRequest.newBuilder().setInitialRequest(initialPublishRequest).build();
        this.connection = new RetryingConnectionImpl(publishStreamFactory, batchPublisherFactory, this, this.initialRequest);
        this.batcher = new SerialBatcher(batchingSettings.getRequestByteThreshold().longValue(), batchingSettings.getElementCountThreshold().longValue());
        addServices(this.connection);
    }

    public PublisherImpl(StreamFactories.PublishStreamFactory publishStreamFactory, InitialPublishRequest initialPublishRequest, BatchingSettings batchingSettings) throws ApiException {
        this(publishStreamFactory, new BatchPublisherImpl.Factory(), AlarmFactory.create(Duration.ofNanos(((org.threeten.bp.Duration) Objects.requireNonNull(batchingSettings.getDelayThreshold())).toNanos())), initialPublishRequest, batchingSettings);
    }

    @GuardedBy("monitor.monitor")
    private void rebatchForRestart() {
        Queue<SerialBatcher.UnbatchedMessage> queue = (Queue) this.batchesInFlight.stream().flatMap(inFlightBatch -> {
            return inFlightBatch.messages.stream();
        }).collect(Collectors.toCollection(ArrayDeque::new));
        logger.atFiner().log("Re-publishing %s messages after reconnection for partition %s", queue.size(), this.initialRequest.getInitialRequest().getPartition());
        long j = 0;
        int i = 0;
        ArrayDeque arrayDeque = new ArrayDeque();
        this.batchesInFlight.clear();
        for (SerialBatcher.UnbatchedMessage unbatchedMessage : queue) {
            long serializedSize = unbatchedMessage.message().getSerializedSize();
            if ((j + serializedSize > Constants.MAX_PUBLISH_BATCH_BYTES || i + 1 > 1000) && !arrayDeque.isEmpty()) {
                this.batchesInFlight.add(new InFlightBatch(ImmutableList.copyOf(arrayDeque)));
                arrayDeque.clear();
                i = 0;
                j = 0;
            }
            arrayDeque.add(unbatchedMessage);
            j += serializedSize;
            i++;
        }
        if (arrayDeque.isEmpty()) {
            return;
        }
        this.batchesInFlight.add(new InFlightBatch(ImmutableList.copyOf(arrayDeque)));
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void triggerReinitialize(CheckedApiException checkedApiException) {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            try {
                CloseableMonitor.Hold enter2 = this.reconnectingMonitor.enter();
                try {
                    this.reconnecting = true;
                    if (enter2 != null) {
                        enter2.close();
                    }
                    this.connection.reinitialize(this.initialRequest);
                    rebatchForRestart();
                    Queue<InFlightBatch> queue = this.batchesInFlight;
                    this.connection.modifyConnection(optional -> {
                        if (optional.isPresent()) {
                            queue.forEach(inFlightBatch -> {
                                ((BatchPublisher) optional.get()).publish(inFlightBatch.messagesToSend(), inFlightBatch.firstSequenceNumber());
                            });
                        }
                    });
                    CloseableMonitor.Hold enter3 = this.reconnectingMonitor.enter();
                    try {
                        this.reconnecting = false;
                        if (enter3 != null) {
                            enter3.close();
                        }
                        if (enter != null) {
                            enter.close();
                        }
                    } catch (Throwable th) {
                        if (enter3 != null) {
                            try {
                                enter3.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    if (enter2 != null) {
                        try {
                            enter2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void handlePermanentError(CheckedApiException checkedApiException) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> {
                future.cancel(false);
            });
            this.alarmFuture = Optional.empty();
            terminateOutstandingPublishes(checkedApiException);
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void start() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            this.alarmFuture = Optional.of(this.alarmFactory.newAlarm(this::backgroundFlushToStream));
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void stop() {
        flush();
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> {
                future.cancel(false);
            });
            this.alarmFuture = Optional.empty();
            if (enter != null) {
                enter.close();
            }
            flush();
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @GuardedBy("monitor.monitor")
    private void terminateOutstandingPublishes(CheckedApiException checkedApiException) {
        this.batchesInFlight.forEach(inFlightBatch -> {
            inFlightBatch.messages.forEach(unbatchedMessage -> {
                unbatchedMessage.future().setException(checkedApiException);
            });
        });
        CloseableMonitor.Hold enter = this.batcherMonitor.enter();
        try {
            this.batcher.flush().forEach(list -> {
                list.forEach(unbatchedMessage -> {
                    unbatchedMessage.future().setException(checkedApiException);
                });
            });
            if (enter != null) {
                enter.close();
            }
            this.batchesInFlight.clear();
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.SequencedPublisher
    public ApiFuture<Offset> publish(PubSubMessage pubSubMessage, PublishSequenceNumber publishSequenceNumber) {
        try {
            CloseableMonitor.Hold enter = this.batcherMonitor.enter();
            try {
                ApiService.State state = state();
                switch (AnonymousClass2.$SwitchMap$com$google$api$core$ApiService$State[state.ordinal()]) {
                    case 1:
                        throw new CheckedApiException("Cannot publish when publisher has failed.", failureCause(), StatusCode.Code.FAILED_PRECONDITION);
                    case 2:
                    case 3:
                        ApiFuture<Offset> add = this.batcher.add(pubSubMessage, publishSequenceNumber);
                        if (enter != null) {
                            enter.close();
                        }
                        return add;
                    default:
                        throw new CheckedApiException("Cannot publish when Publisher state is " + state.name(), StatusCode.Code.FAILED_PRECONDITION);
                }
            } finally {
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
            return ApiFutures.immediateFailedFuture(e);
        }
    }

    @Override // com.google.cloud.pubsublite.internal.SequencedPublisher
    public void cancelOutstandingPublishes() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            terminateOutstandingPublishes(new CheckedApiException("Cancelled by client.", StatusCode.Code.CANCELLED));
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void backgroundFlushToStream() {
        CloseableMonitor.Hold enter = this.reconnectingMonitor.enter();
        try {
            if (this.reconnecting) {
                if (enter != null) {
                    enter.close();
                }
            } else {
                if (enter != null) {
                    enter.close();
                }
                flushToStream();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void flushToStream() {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            try {
                if (this.shutdown) {
                    if (enter != null) {
                        enter.close();
                        return;
                    }
                    return;
                }
                CloseableMonitor.Hold enter2 = this.batcherMonitor.enter();
                try {
                    List<List<SerialBatcher.UnbatchedMessage>> flush = this.batcher.flush();
                    if (enter2 != null) {
                        enter2.close();
                    }
                    Iterator<List<SerialBatcher.UnbatchedMessage>> it = flush.iterator();
                    while (it.hasNext()) {
                        processBatch(it.next());
                    }
                    if (enter != null) {
                        enter.close();
                    }
                } catch (Throwable th) {
                    if (enter2 != null) {
                        try {
                            enter2.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } finally {
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
    }

    @GuardedBy("monitor.monitor")
    private void processBatch(List<SerialBatcher.UnbatchedMessage> list) throws CheckedApiException {
        if (list.isEmpty()) {
            return;
        }
        InFlightBatch inFlightBatch = new InFlightBatch(list);
        this.batchesInFlight.add(inFlightBatch);
        this.connection.modifyConnection(optional -> {
            CheckedApiPreconditions.checkState(optional.isPresent(), "Published after the stream shut down.");
            ((BatchPublisher) optional.get()).publish(inFlightBatch.messagesToSend(), inFlightBatch.firstSequenceNumber());
        });
    }

    @Override // java.io.Flushable
    public void flush() {
        flushToStream();
        CloseableMonitor.Hold enterWhenUninterruptibly = this.monitor.enterWhenUninterruptibly(this.noneInFlight);
        if (enterWhenUninterruptibly != null) {
            enterWhenUninterruptibly.close();
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void onClientResponse(MessagePublishResponse messagePublishResponse) throws CheckedApiException {
        ImmutableList sortedCopyOf = ImmutableList.sortedCopyOf(Comparator.comparing((v0) -> {
            return v0.getStartIndex();
        }), messagePublishResponse.getCursorRangesList());
        CloseableMonitor.Hold enter = this.monitor.enter();
        try {
            CheckedApiPreconditions.checkState(!this.batchesInFlight.isEmpty(), "Received publish response with no batches in flight.");
            InFlightBatch remove = this.batchesInFlight.remove();
            int i = 0;
            for (int i2 = 0; i2 < remove.messages.size(); i2++) {
                SerialBatcher.UnbatchedMessage unbatchedMessage = remove.messages.get(i2);
                try {
                    if (i < sortedCopyOf.size() && ((MessagePublishResponse.CursorRange) sortedCopyOf.get(i)).getEndIndex() <= i2) {
                        i++;
                        if (i < sortedCopyOf.size() && ((MessagePublishResponse.CursorRange) sortedCopyOf.get(i)).getStartIndex() < ((MessagePublishResponse.CursorRange) sortedCopyOf.get(i - 1)).getEndIndex()) {
                            throw new CheckedApiException(String.format("Server sent invalid cursor ranges in message publish response: %s", messagePublishResponse), StatusCode.Code.FAILED_PRECONDITION);
                        }
                    }
                    if (i >= sortedCopyOf.size() || i2 < ((MessagePublishResponse.CursorRange) sortedCopyOf.get(i)).getStartIndex() || i2 >= ((MessagePublishResponse.CursorRange) sortedCopyOf.get(i)).getEndIndex()) {
                        unbatchedMessage.future().set(Offset.of(-1L));
                    } else {
                        Offset of = Offset.of((((MessagePublishResponse.CursorRange) sortedCopyOf.get(i)).getStartCursor().getOffset() + i2) - r0.getStartIndex());
                        if (this.lastSentOffset.value() >= of.value()) {
                            throw new CheckedApiException(String.format("Received publish response with offset %s that is inconsistent with previous offset %s", of, this.lastSentOffset), StatusCode.Code.FAILED_PRECONDITION);
                        }
                        unbatchedMessage.future().set(of);
                        this.lastSentOffset = of;
                    }
                } catch (CheckedApiException e) {
                    remove.failBatch(i2, e);
                    throw e;
                }
            }
            if (enter != null) {
                enter.close();
            }
        } catch (Throwable th) {
            if (enter != null) {
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
