package com.marcnuri.yakc.reactivex;

import com.marcnuri.yakc.KubernetesClient;
import com.marcnuri.yakc.api.ExecMessage;
import com.marcnuri.yakc.api.KubernetesException;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.WebSocket;
import okhttp3.WebSocketListener;
import okio.ByteString;

/* loaded from: input_file:com/marcnuri/yakc/reactivex/ExecOnSubscribe.class */
public class ExecOnSubscribe implements ObservableOnSubscribe<ExecMessage>, Disposable {
    private final Request request;
    private final KubernetesClient kubernetesClient;
    private final AtomicBoolean disposed = new AtomicBoolean(false);
    private AtomicReference<WebSocket> webSocket = new AtomicReference<>(null);

    public ExecOnSubscribe(Request request, KubernetesClient kubernetesClient) {
        this.request = request;
        this.kubernetesClient = kubernetesClient;
    }

    public void subscribe(final ObservableEmitter<ExecMessage> observableEmitter) throws Exception {
        observableEmitter.setDisposable(this);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.webSocket.set(this.kubernetesClient.getOkHttpClient().newWebSocket(this.request.newBuilder().url(this.request.url().newBuilder().addQueryParameter("stdout", "true").addQueryParameter("stderr", "true").build()).build(), new WebSocketListener() { // from class: com.marcnuri.yakc.reactivex.ExecOnSubscribe.1
            public void onMessage(WebSocket webSocket, String str) {
                observableEmitter.onNext(ExecMessage.builder().standardStream(ExecMessage.StandardStream.STDOUT).message(str).build());
            }

            public void onMessage(WebSocket webSocket, ByteString byteString) {
                observableEmitter.onNext(ExecMessage.builder().standardStream(ExecMessage.StandardStream.fromByte(byteString.getByte(0))).message(byteString.substring(1).utf8()).build());
            }

            public void onClosing(WebSocket webSocket, int i, String str) {
                ExecOnSubscribe.this.dispose();
            }

            public void onClosed(WebSocket webSocket, int i, String str) {
                observableEmitter.onComplete();
                close();
            }

            public void onFailure(WebSocket webSocket, Throwable th, Response response) {
                observableEmitter.onError(new KubernetesException((String) Optional.ofNullable(response).map((v0) -> {
                    return v0.body();
                }).map(responseBody -> {
                    try {
                        return responseBody.string();
                    } catch (IOException e) {
                        return null;
                    }
                }).orElse(th.getMessage()), response));
                close();
            }

            private void close() {
                ExecOnSubscribe.this.disposed.set(true);
                countDownLatch.countDown();
            }
        }));
        countDownLatch.await();
    }

    public void dispose() {
        Optional.ofNullable(this.webSocket.get()).ifPresent(webSocket -> {
            webSocket.close(1000, (String) null);
        });
    }

    public boolean isDisposed() {
        return this.disposed.get();
    }
}
