package org.apache.beam.repackaged.direct_java.sdk.fn.stream;

import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1p43p2.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/beam/repackaged/direct_java/sdk/fn/stream/DirectStreamObserver.class */
public final class DirectStreamObserver<T> implements StreamObserver<T> {
    private static final Logger LOG = LoggerFactory.getLogger(DirectStreamObserver.class);
    private static final int DEFAULT_MAX_MESSAGES_BEFORE_CHECK = 100;
    private final Phaser phaser;
    private final CallStreamObserver<T> outboundObserver;
    private final int maxMessagesBeforeCheck;
    private AtomicInteger numMessages;

    public DirectStreamObserver(Phaser phaser, CallStreamObserver<T> callStreamObserver) {
        this(phaser, callStreamObserver, DEFAULT_MAX_MESSAGES_BEFORE_CHECK);
    }

    DirectStreamObserver(Phaser phaser, CallStreamObserver<T> callStreamObserver, int i) {
        this.numMessages = new AtomicInteger();
        this.phaser = phaser;
        this.outboundObserver = callStreamObserver;
        this.maxMessagesBeforeCheck = i;
    }

    public void onNext(T t) {
        if (this.maxMessagesBeforeCheck <= 1 || this.numMessages.incrementAndGet() % this.maxMessagesBeforeCheck == 0) {
            int i = 1;
            int i2 = 0;
            int phase = this.phaser.getPhase();
            while (!this.outboundObserver.isReady()) {
                try {
                    this.phaser.awaitAdvanceInterruptibly(phase, i, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (TimeoutException e2) {
                    i2 += i;
                    i *= 2;
                }
            }
            if (i2 > 0) {
                if (phase == this.phaser.getPhase()) {
                    LOG.info("Output channel stalled for {}s, outbound thread {}. See: https://issues.apache.org/jira/browse/BEAM-4280 for the history for this issue.", Integer.valueOf(i2), Thread.currentThread().getName());
                } else {
                    LOG.debug("Output channel stalled for {}s, outbound thread {}.", Integer.valueOf(i2), Thread.currentThread().getName());
                }
            }
        }
        synchronized (this.outboundObserver) {
            this.outboundObserver.onNext(t);
        }
    }

    public void onError(Throwable th) {
        synchronized (this.outboundObserver) {
            this.outboundObserver.onError(th);
        }
    }

    public void onCompleted() {
        synchronized (this.outboundObserver) {
            this.outboundObserver.onCompleted();
        }
    }
}
