/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal;

import java.net.MalformedURLException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KubernetesResourceList;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ListOptions;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Status;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.KubernetesClientException;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.Watcher;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.AbstractWatchManager;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.BaseOperation;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.dsl.internal.OperationSupport;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.AsyncBody;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.HttpClient;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.HttpRequest;
import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.client.http.HttpResponse;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>>
extends AbstractWatchManager<T> {
    private CompletableFuture<HttpResponse<AsyncBody>> call;
    private volatile AsyncBody body;

    public WatchHTTPManager(HttpClient client, BaseOperation<T, L, ?> baseOperation, ListOptions listOptions, Watcher<T> watcher, int reconnectInterval, int reconnectLimit) throws MalformedURLException {
        super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, client);
    }

    @Override
    protected synchronized void start(URL url, Map<String, String> headers, AbstractWatchManager.WatchRequestState state) {
        HttpRequest.Builder builder = this.client.newHttpRequestBuilder().url(url).forStreaming();
        headers.forEach(builder::header);
        StringBuffer buffer = new StringBuffer();
        this.call = this.client.consumeBytes(builder.build(), (b, a) -> {
            for (ByteBuffer content : b) {
                for (char c : StandardCharsets.UTF_8.decode(content).array()) {
                    if (c == '\n') {
                        this.onMessage(buffer.toString(), state);
                        buffer.setLength(0);
                        continue;
                    }
                    buffer.append(c);
                }
            }
            a.consume();
        });
        this.call.whenComplete((response, t) -> {
            if (t != null) {
                this.watchEnded((Throwable)t, state);
            }
            if (response != null) {
                this.body = (AsyncBody)response.body();
                if (!response.isSuccessful()) {
                    Status status = OperationSupport.createStatus(response.code(), response.message());
                    if (this.onStatus(status, state)) {
                        return;
                    }
                    this.watchEnded(new KubernetesClientException(status), state);
                } else {
                    this.resetReconnectAttempts(state);
                    this.body.consume();
                    this.body.done().whenComplete((v, e) -> this.watchEnded((Throwable)e, state));
                }
            }
        });
    }

    @Override
    protected synchronized void closeCurrentRequest() {
        Optional.ofNullable(this.call).ifPresent(theFuture -> theFuture.cancel(true));
        Optional.ofNullable(this.body).ifPresent(AsyncBody::cancel);
    }
}

