package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CheckedApiPreconditions;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.SerialExecutor;
import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriberImpl;
import com.google.cloud.pubsublite.internal.wire.StreamFactories;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.InitialSubscribeRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SubscribeRequest;
import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:com/google/cloud/pubsublite/internal/wire/SubscriberImpl.class */
public class SubscriberImpl extends ProxyService implements Subscriber, RetryingConnectionObserver<List<SequencedMessage>> {
    private static final Duration FLOW_REQUESTS_FLUSH_INTERVAL = Duration.ofMillis(100);
    private final AlarmFactory alarmFactory;
    private final Consumer<List<SequencedMessage>> messageConsumer;
    private final SubscriberResetHandler resetHandler;
    private final InitialSubscribeRequest baseInitialRequest;
    private final SerialExecutor messageDeliveryExecutor;
    private final CloseableMonitor monitor;

    @GuardedBy("monitor.monitor")
    private Optional<Future<?>> alarmFuture;

    @GuardedBy("monitor.monitor")
    private final RetryingConnection<SubscribeRequest, ConnectedSubscriber> connection;

    @GuardedBy("monitor.monitor")
    private final NextOffsetTracker nextOffsetTracker;

    @GuardedBy("monitor.monitor")
    private final FlowControlBatcher flowControlBatcher;

    @GuardedBy("monitor.monitor")
    private SeekRequest initialLocation;

    @GuardedBy("monitor.monitor")
    private boolean shutdown;

    @VisibleForTesting
    SubscriberImpl(StreamFactories.SubscribeStreamFactory subscribeStreamFactory, ConnectedSubscriberFactory connectedSubscriberFactory, AlarmFactory alarmFactory, InitialSubscribeRequest initialSubscribeRequest, SeekRequest seekRequest, Consumer<List<SequencedMessage>> consumer, SubscriberResetHandler subscriberResetHandler) throws ApiException {
        super(new ApiService[0]);
        this.monitor = new CloseableMonitor();
        this.alarmFuture = Optional.empty();
        this.nextOffsetTracker = new NextOffsetTracker();
        this.flowControlBatcher = new FlowControlBatcher();
        this.shutdown = false;
        this.alarmFactory = alarmFactory;
        this.messageConsumer = consumer;
        this.resetHandler = subscriberResetHandler;
        this.baseInitialRequest = initialSubscribeRequest;
        this.messageDeliveryExecutor = new SerialExecutor(SystemExecutors.getFuturesExecutor());
        this.initialLocation = seekRequest;
        this.connection = new RetryingConnectionImpl(subscribeStreamFactory, connectedSubscriberFactory, this, getInitialRequest());
        addServices(this.connection);
    }

    public SubscriberImpl(StreamFactories.SubscribeStreamFactory subscribeStreamFactory, InitialSubscribeRequest initialSubscribeRequest, SeekRequest seekRequest, Consumer<List<SequencedMessage>> consumer, SubscriberResetHandler subscriberResetHandler) throws ApiException {
        this(subscribeStreamFactory, new ConnectedSubscriberImpl.Factory(), AlarmFactory.create(FLOW_REQUESTS_FLUSH_INTERVAL), initialSubscribeRequest, seekRequest, consumer, subscriberResetHandler);
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void start() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.alarmFuture = Optional.of(this.alarmFactory.newAlarm(this::processBatchFlowRequest));
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void stop() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            this.shutdown = true;
            this.alarmFuture.ifPresent(future -> {
                future.cancel(false);
            });
            this.alarmFuture = Optional.empty();
            this.messageDeliveryExecutor.close();
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.ProxyService
    protected void handlePermanentError(CheckedApiException checkedApiException) {
        stop();
    }

    @Override // com.google.cloud.pubsublite.internal.wire.Subscriber
    public void allowFlow(FlowControlRequest flowControlRequest) throws CheckedApiException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                if (this.shutdown) {
                    if (enter != null) {
                        if (0 == 0) {
                            enter.close();
                            return;
                        }
                        try {
                            enter.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                this.flowControlBatcher.onClientFlowRequest(flowControlRequest);
                if (this.flowControlBatcher.shouldExpediteBatchRequest()) {
                    this.connection.modifyConnection(optional -> {
                        optional.ifPresent(connectedSubscriber -> {
                            flushBatchFlowRequest(connectedSubscriber);
                        });
                    });
                }
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    enter.close();
                }
            }
            throw th5;
        }
    }

    private SubscribeRequest getInitialRequest() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            SubscribeRequest build = SubscribeRequest.newBuilder().setInitial(this.baseInitialRequest.toBuilder().setInitialLocation(this.nextOffsetTracker.requestForRestart().orElse(this.initialLocation))).build();
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    enter.close();
                }
            }
            return build;
        } catch (Throwable th3) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    enter.close();
                }
            }
            throw th3;
        }
    }

    public void reset() {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            if (this.shutdown) {
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                        return;
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                        return;
                    }
                }
                return;
            }
            this.nextOffsetTracker.reset();
            this.initialLocation = SeekRequest.newBuilder().setNamedTarget(SeekRequest.NamedTarget.COMMITTED_CURSOR).build();
            if (enter != null) {
                if (0 == 0) {
                    enter.close();
                    return;
                }
                try {
                    enter.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (enter != null) {
                if (0 != 0) {
                    try {
                        enter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    enter.close();
                }
            }
            throw th4;
        }
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void triggerReinitialize(CheckedApiException checkedApiException) {
        if (ResetSignal.isResetSignal(checkedApiException)) {
            try {
                if (this.resetHandler.handleReset()) {
                    reset();
                }
            } catch (CheckedApiException e) {
                onPermanentError(e);
                return;
            }
        }
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                try {
                    if (!this.shutdown) {
                        this.connection.reinitialize(getInitialRequest());
                        this.connection.modifyConnection(optional -> {
                            CheckedApiPreconditions.checkArgument(this.monitor.monitor.isOccupiedByCurrentThread());
                            CheckedApiPreconditions.checkArgument(optional.isPresent());
                            this.flowControlBatcher.requestForRestart().ifPresent(flowControlRequest -> {
                                ((ConnectedSubscriber) optional.get()).allowFlow(flowControlRequest);
                            });
                        });
                        if (enter != null) {
                            if (0 != 0) {
                                try {
                                    enter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                enter.close();
                            }
                        }
                        return;
                    }
                    if (enter != null) {
                        if (0 == 0) {
                            enter.close();
                            return;
                        }
                        try {
                            enter.close();
                            return;
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                            return;
                        }
                    }
                    return;
                } catch (Throwable th4) {
                    th = th4;
                    throw th4;
                }
            } finally {
            }
        } catch (CheckedApiException e2) {
            onPermanentError(e2);
        }
        onPermanentError(e2);
    }

    @Override // com.google.cloud.pubsublite.internal.wire.RetryingConnectionObserver
    public void onClientResponse(List<SequencedMessage> list) throws CheckedApiException {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                if (this.shutdown) {
                    if (enter != null) {
                        if (0 == 0) {
                            enter.close();
                            return;
                        }
                        try {
                            enter.close();
                            return;
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                            return;
                        }
                    }
                    return;
                }
                this.nextOffsetTracker.onMessages(list);
                this.flowControlBatcher.onMessages(list);
                this.messageDeliveryExecutor.execute(() -> {
                    this.messageConsumer.accept(list);
                });
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    enter.close();
                }
            }
            throw th5;
        }
    }

    private void processBatchFlowRequest() {
        try {
            CloseableMonitor.Hold enter = this.monitor.enter();
            Throwable th = null;
            try {
                if (!this.shutdown) {
                    this.connection.modifyConnection(optional -> {
                        optional.ifPresent(connectedSubscriber -> {
                            flushBatchFlowRequest(connectedSubscriber);
                        });
                    });
                    if (enter != null) {
                        if (0 != 0) {
                            try {
                                enter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            enter.close();
                        }
                    }
                    return;
                }
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                        return;
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                        return;
                    }
                }
                return;
            } catch (Throwable th4) {
                if (enter != null) {
                    if (0 != 0) {
                        try {
                            enter.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        enter.close();
                    }
                }
                throw th4;
            }
        } catch (CheckedApiException e) {
            onPermanentError(e);
        }
        onPermanentError(e);
    }

    private void flushBatchFlowRequest(ConnectedSubscriber connectedSubscriber) {
        CloseableMonitor.Hold enter = this.monitor.enter();
        Throwable th = null;
        try {
            try {
                this.flowControlBatcher.releasePendingRequest().ifPresent(flowControlRequest -> {
                    connectedSubscriber.allowFlow(flowControlRequest);
                });
                if (enter != null) {
                    if (0 == 0) {
                        enter.close();
                        return;
                    }
                    try {
                        enter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (enter != null) {
                if (th != null) {
                    try {
                        enter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    enter.close();
                }
            }
            throw th4;
        }
    }
}
