package org.apache.beam.sdk.fn.data;

import java.io.InputStream;
import java.util.function.BiConsumer;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataInboundObserver.class */
public class BeamFnDataInboundObserver<T> implements BiConsumer<ByteString, Boolean>, InboundDataClient {
    private static final Logger LOG = LoggerFactory.getLogger(BeamFnDataInboundObserver.class);
    private final LogicalEndpoint endpoint;
    private final FnDataReceiver<T> consumer;
    private final Coder<T> coder;
    private final InboundDataClient readFuture;
    private long byteCounter;
    private long counter;

    public static <T> BeamFnDataInboundObserver<T> forConsumer(LogicalEndpoint logicalEndpoint, Coder<T> coder, FnDataReceiver<T> fnDataReceiver) {
        return new BeamFnDataInboundObserver<>(logicalEndpoint, coder, fnDataReceiver, CompletableFutureInboundDataClient.create());
    }

    public BeamFnDataInboundObserver(LogicalEndpoint logicalEndpoint, Coder<T> coder, FnDataReceiver<T> fnDataReceiver, InboundDataClient inboundDataClient) {
        this.endpoint = logicalEndpoint;
        this.coder = coder;
        this.consumer = fnDataReceiver;
        this.readFuture = inboundDataClient;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.util.function.BiConsumer
    public void accept(ByteString byteString, Boolean bool) {
        if (this.readFuture.isDone()) {
            return;
        }
        try {
            if (byteString.isEmpty() || bool.booleanValue()) {
                LOG.debug("Closing stream for {} having consumed {} values {} bytes", new Object[]{this.endpoint, Long.valueOf(this.counter), Long.valueOf(this.byteCounter)});
                this.readFuture.complete();
                return;
            }
            this.byteCounter += byteString.size();
            InputStream newInput = byteString.newInput();
            while (newInput.available() > 0) {
                this.counter++;
                this.consumer.accept(this.coder.decode(newInput));
            }
        } catch (Exception e) {
            this.readFuture.fail(e);
        }
    }

    @Override // org.apache.beam.sdk.fn.data.InboundDataClient
    public void awaitCompletion() throws Exception {
        this.readFuture.awaitCompletion();
    }

    @Override // org.apache.beam.sdk.fn.data.InboundDataClient
    public boolean isDone() {
        return this.readFuture.isDone();
    }

    @Override // org.apache.beam.sdk.fn.data.InboundDataClient
    public void cancel() {
        this.readFuture.cancel();
    }

    @Override // org.apache.beam.sdk.fn.data.InboundDataClient
    public void complete() {
        this.readFuture.complete();
    }

    @Override // org.apache.beam.sdk.fn.data.InboundDataClient
    public void fail(Throwable th) {
        this.readFuture.fail(th);
    }
}
