package org.apache.cxf.jaxrs.reactivestreams.server;

import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.container.TimeoutHandler;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.cxf.common.util.StringUtils;
import org.apache.cxf.jaxrs.ext.StreamingResponse;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber.class */
public class StreamingAsyncSubscriber<T> extends AbstractSubscriber<T> {
    private BlockingQueue<T> queue;
    private String openTag;
    private String closeTag;
    private String separator;
    private long pollTimeout;
    private long asyncTimeout;
    private volatile boolean completed;
    private volatile Throwable throwable;
    private AtomicBoolean tagsWriteDone;
    private AtomicBoolean firstWriteDone;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber$StreamingResponseImpl.class */
    public final class StreamingResponseImpl implements StreamingResponse<T> {
        private StreamingResponseImpl() {
        }

        public void writeTo(StreamingResponse.Writer<T> writer) throws IOException {
            while (true) {
                if (StreamingAsyncSubscriber.this.completed && StreamingAsyncSubscriber.this.queue.isEmpty()) {
                    break;
                }
                if (StreamingAsyncSubscriber.this.tagsWriteDone.compareAndSet(false, true) && StreamingAsyncSubscriber.this.openTag != null) {
                    writer.getEntityStream().write(StringUtils.toBytesUTF8(StreamingAsyncSubscriber.this.openTag));
                }
                try {
                    T poll = StreamingAsyncSubscriber.this.queue.poll(StreamingAsyncSubscriber.this.pollTimeout, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        if (StreamingAsyncSubscriber.this.firstWriteDone.getAndSet(true)) {
                            writer.getEntityStream().write(StringUtils.toBytesUTF8(StreamingAsyncSubscriber.this.separator));
                        }
                        writer.write(poll);
                    }
                } catch (InterruptedException e) {
                }
            }
            if (StreamingAsyncSubscriber.this.closeTag != null && StreamingAsyncSubscriber.this.tagsWriteDone.get()) {
                writer.getEntityStream().write(StringUtils.toBytesUTF8(StreamingAsyncSubscriber.this.closeTag));
            }
            if (StreamingAsyncSubscriber.this.throwable != null) {
                if (StreamingAsyncSubscriber.this.firstWriteDone.get()) {
                    throw new ResponseStatusOnlyException(StreamingAsyncSubscriber.this.throwable);
                }
                if (StreamingAsyncSubscriber.this.throwable instanceof RuntimeException) {
                    throw ((RuntimeException) StreamingAsyncSubscriber.this.throwable);
                }
                if (!(StreamingAsyncSubscriber.this.throwable instanceof IOException)) {
                    throw new IOException(StreamingAsyncSubscriber.this.throwable);
                }
                throw ((IOException) StreamingAsyncSubscriber.this.throwable);
            }
            if (StreamingAsyncSubscriber.this.tagsWriteDone.get()) {
                return;
            }
            if (StreamingAsyncSubscriber.this.openTag != null) {
                writer.getEntityStream().write(StringUtils.toBytesUTF8(StreamingAsyncSubscriber.this.openTag));
            }
            if (StreamingAsyncSubscriber.this.closeTag != null) {
                writer.getEntityStream().write(StringUtils.toBytesUTF8(StreamingAsyncSubscriber.this.closeTag));
            }
        }
    }

    /* loaded from: input_file:org/apache/cxf/jaxrs/reactivestreams/server/StreamingAsyncSubscriber$TimeoutHandlerImpl.class */
    public class TimeoutHandlerImpl implements TimeoutHandler {
        public TimeoutHandlerImpl() {
        }

        public void handleTimeout(AsyncResponse asyncResponse) {
            if (StreamingAsyncSubscriber.this.queue.isEmpty()) {
                asyncResponse.setTimeout(StreamingAsyncSubscriber.this.asyncTimeout, TimeUnit.MILLISECONDS);
            } else {
                StreamingAsyncSubscriber.this.resumeAsyncResponse();
            }
        }
    }

    public StreamingAsyncSubscriber(AsyncResponse asyncResponse, String str, String str2, String str3) {
        this(asyncResponse, str, str2, str3, 1000L);
    }

    public StreamingAsyncSubscriber(AsyncResponse asyncResponse, String str, String str2, String str3, long j) {
        this(asyncResponse, str, str2, str3, j, 0L);
    }

    public StreamingAsyncSubscriber(AsyncResponse asyncResponse, String str, String str2, String str3, long j, long j2) {
        super(asyncResponse);
        this.queue = new LinkedBlockingQueue();
        this.tagsWriteDone = new AtomicBoolean();
        this.firstWriteDone = new AtomicBoolean();
        this.openTag = str;
        this.closeTag = str2;
        this.separator = str3;
        this.pollTimeout = j;
        this.asyncTimeout = 0L;
        if (j2 > 0) {
            asyncResponse.setTimeout(j2, TimeUnit.MILLISECONDS);
            asyncResponse.setTimeoutHandler(new TimeoutHandlerImpl());
        }
    }

    @Override // org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber
    public void onSubscribe(Subscription subscription) {
        if (this.asyncTimeout == 0) {
            resumeAsyncResponse();
        }
        super.onSubscribe(subscription);
    }

    private void resumeAsyncResponse() {
        super.resume((StreamingResponse) new StreamingResponseImpl());
    }

    @Override // org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber
    public void onComplete() {
        this.completed = true;
    }

    @Override // org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber
    public void onError(Throwable th) {
        this.throwable = th;
        this.completed = true;
        super.onError(th);
    }

    @Override // org.apache.cxf.jaxrs.reactivestreams.server.AbstractSubscriber
    public void onNext(T t) {
        if (this.asyncTimeout > 0 && getAsyncResponse().isSuspended()) {
            resumeAsyncResponse();
        }
        this.queue.add(t);
        super.requestNext();
    }
}
