package com.urbanairship.connect.client;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.urbanairship.connect.client.consume.ConnectionRetryStrategy;
import com.urbanairship.connect.client.consume.FullBodyConsumer;
import com.urbanairship.connect.client.consume.MobileEventStreamBodyConsumer;
import com.urbanairship.connect.client.consume.MobileEventStreamConnectFuture;
import com.urbanairship.connect.client.consume.MobileEventStreamResponseHandler;
import com.urbanairship.connect.client.consume.StatusAndHeaders;
import com.urbanairship.connect.client.model.Creds;
import com.urbanairship.connect.client.model.GsonUtil;
import com.urbanairship.connect.client.model.StreamQueryDescriptor;
import com.urbanairship.connect.client.model.request.StartPosition;
import com.urbanairship.connect.client.model.request.StreamRequestPayload;
import com.urbanairship.connect.java8.Consumer;
import io.netty.handler.codec.http.CookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/urbanairship/connect/client/StreamConnection.class */
public class StreamConnection implements AutoCloseable {
    public static final String X_UA_APPKEY = "X-UA-Appkey";
    public static final String ACCEPT_HEADER = "application/vnd.urbanairship+x-ndjson; version=3;";
    private final StreamQueryDescriptor descriptor;
    private final AsyncHttpClient client;
    private final ConnectionRetryStrategy connectionRetryStrategy;
    private final Consumer<String> eventConsumer;
    private final String url;
    private final AtomicBoolean gate;
    private volatile Connection connection;
    private volatile CountDownLatch bodyConsumeLatch;
    private volatile boolean closed;
    private final Object transitionLock;
    private static final Logger log = LoggerFactory.getLogger(StreamConnection.class);
    private static final Gson GSON = GsonUtil.getGson();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/urbanairship/connect/client/StreamConnection$Connection.class */
    public static final class Connection {
        private final ListenableFuture<Boolean> future;
        private final MobileEventStreamResponseHandler handler;

        private Connection(ListenableFuture<Boolean> listenableFuture, MobileEventStreamResponseHandler mobileEventStreamResponseHandler) {
            this.future = listenableFuture;
            this.handler = mobileEventStreamResponseHandler;
        }

        public void consume(final CountDownLatch countDownLatch, Consumer<String> consumer) {
            this.future.addListener(new Runnable() { // from class: com.urbanairship.connect.client.StreamConnection.Connection.1
                @Override // java.lang.Runnable
                public void run() {
                    countDownLatch.countDown();
                }
            }, MoreExecutors.directExecutor());
            this.handler.consumeBody(new MobileEventStreamBodyConsumer(consumer));
        }

        public Optional<Throwable> getConsumeError() {
            return this.handler.getError();
        }

        public void close() {
            try {
                this.handler.stop();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            this.future.done();
        }
    }

    public StreamConnection(StreamQueryDescriptor streamQueryDescriptor, AsyncHttpClient asyncHttpClient, ConnectionRetryStrategy connectionRetryStrategy, Consumer<String> consumer, String str) {
        this.gate = new AtomicBoolean(false);
        this.connection = null;
        this.bodyConsumeLatch = null;
        this.closed = false;
        this.transitionLock = new Object();
        this.descriptor = streamQueryDescriptor;
        this.client = asyncHttpClient;
        this.connectionRetryStrategy = connectionRetryStrategy;
        this.eventConsumer = consumer;
        this.url = str;
    }

    public StreamConnection(StreamQueryDescriptor streamQueryDescriptor, AsyncHttpClient asyncHttpClient, ConnectionRetryStrategy connectionRetryStrategy, Consumer<String> consumer) {
        this(streamQueryDescriptor, asyncHttpClient, connectionRetryStrategy, consumer, streamQueryDescriptor.getEndpointUrl());
    }

    public void read(Optional<StartPosition> optional) throws ConnectionException, InterruptedException {
        if (!this.gate.compareAndSet(false, true)) {
            throw new IllegalStateException("Stream is already consuming!");
        }
        boolean z = false;
        int i = 0;
        Optional<? extends Exception> absent = Optional.absent();
        while (true) {
            i++;
            synchronized (this.transitionLock) {
                if (!this.closed) {
                    absent = begin(optional, i);
                    z = !absent.isPresent();
                    boolean z2 = false;
                    if (!z && !this.closed) {
                        z2 = this.connectionRetryStrategy.shouldRetry(i);
                        if (z2) {
                            Thread.sleep(this.connectionRetryStrategy.getPauseMillis(i));
                        }
                    }
                    if (z || !z2) {
                        break;
                    }
                } else {
                    break;
                }
            }
        }
        if (z || this.closed) {
            if (z) {
                consume();
            }
        } else {
            Exception exc = (Exception) absent.get();
            String format = String.format("Failed to establish connection to event stream after %d attempts", Integer.valueOf(i));
            if (!(exc instanceof ConnectionException)) {
                throw new RuntimeException(format, exc);
            }
            ConnectionException connectionException = (ConnectionException) exc;
            throw new ConnectionException(format, connectionException.getErrorCode(), connectionException);
        }
    }

    private Optional<? extends Exception> begin(Optional<StartPosition> optional, int i) throws InterruptedException {
        try {
            this.connection = connect(Collections.emptyList(), optional);
            this.bodyConsumeLatch = new CountDownLatch(1);
            this.connection.consume(this.bodyConsumeLatch, this.eventConsumer);
            return Optional.absent();
        } catch (ConnectionException e) {
            return Optional.of(e);
        } catch (InterruptedException e2) {
            throw e2;
        } catch (Exception e3) {
            return Optional.of(e3);
        }
    }

    private void consume() throws InterruptedException {
        try {
            this.bodyConsumeLatch.await();
            Optional<Throwable> consumeError = this.connection.getConsumeError();
            if (consumeError.isPresent()) {
                throw new RuntimeException("Error occurred consuming stream for app " + getAppKey(), (Throwable) consumeError.get());
            }
        } finally {
            cleanup();
        }
    }

    private void cleanup() {
        synchronized (this.transitionLock) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.bodyConsumeLatch != null) {
                this.bodyConsumeLatch.countDown();
            }
            if (this.connection != null) {
                this.connection.close();
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        cleanup();
    }

    private Connection connect(Collection<Cookie> collection, Optional<StartPosition> optional) throws InterruptedException, ExecutionException, ConnectionException {
        BoundRequestBuilder buildRequest = buildRequest(collection, optional);
        MobileEventStreamConnectFuture mobileEventStreamConnectFuture = new MobileEventStreamConnectFuture();
        MobileEventStreamResponseHandler mobileEventStreamResponseHandler = new MobileEventStreamResponseHandler(mobileEventStreamConnectFuture);
        ListenableFuture<Boolean> execute = buildRequest.execute(mobileEventStreamResponseHandler);
        try {
            StatusAndHeaders statusAndHeaders = (StatusAndHeaders) mobileEventStreamConnectFuture.get();
            int statusCode = statusAndHeaders.getStatusCode();
            if (statusCode == 200) {
                return new Connection(execute, mobileEventStreamResponseHandler);
            }
            if (statusCode != 307) {
                throw buildErrorException(mobileEventStreamResponseHandler, execute, statusCode);
            }
            return handleRedirect(statusAndHeaders, optional);
        } catch (InterruptedException | ExecutionException e) {
            mobileEventStreamResponseHandler.stop();
            execute.done();
            throw e;
        }
    }

    private ConnectionException buildErrorException(MobileEventStreamResponseHandler mobileEventStreamResponseHandler, ListenableFuture<Boolean> listenableFuture, int i) throws InterruptedException, ExecutionException {
        FullBodyConsumer fullBodyConsumer = new FullBodyConsumer();
        try {
            mobileEventStreamResponseHandler.consumeBody(fullBodyConsumer);
            listenableFuture.get();
            mobileEventStreamResponseHandler.stop();
            listenableFuture.done();
            String m6get = fullBodyConsumer.m6get();
            return (399 >= i || i >= 500) ? new ConnectionException(String.format("Received unexpected status code (%d) from request for stream for app %s. Response body: %s", Integer.valueOf(i), getAppKey(), m6get), i) : new ConnectionException(String.format("Received status code (%d) from a bad request for app %s. Response body: %s", Integer.valueOf(i), getAppKey(), m6get), i);
        } catch (Throwable th) {
            mobileEventStreamResponseHandler.stop();
            listenableFuture.done();
            throw th;
        }
    }

    private BoundRequestBuilder buildRequest(Collection<Cookie> collection, Optional<StartPosition> optional) {
        byte[] query = getQuery(optional);
        BoundRequestBuilder addHeader = this.client.preparePost(this.url).addHeader("Accept", ACCEPT_HEADER).addHeader("Content-Length", Integer.toString(query.length));
        for (Map.Entry<String, String> entry : getAuthHeaders(this.descriptor.getCreds()).entrySet()) {
            addHeader.addHeader(entry.getKey(), entry.getValue());
        }
        Iterator<Cookie> it = collection.iterator();
        while (it.hasNext()) {
            addHeader.addCookie(it.next());
        }
        addHeader.setBody(query);
        return addHeader;
    }

    private Connection handleRedirect(StatusAndHeaders statusAndHeaders, Optional<StartPosition> optional) throws InterruptedException, ExecutionException, ConnectionException {
        String str = statusAndHeaders.getHeaders().get("Set-Cookie");
        if (str == null) {
            throw new ConnectionException("Received redirect response with no 'Set-Cookie' header in response!", statusAndHeaders.getStatusCode());
        }
        Set decode = CookieDecoder.decode(str);
        if (decode == null) {
            throw new ConnectionException("Received redirect response with unparsable 'Set-Cookie' value - " + str, statusAndHeaders.getStatusCode());
        }
        try {
            return connect(new ArrayList(decode), optional);
        } catch (Exception e) {
            throw e;
        }
    }

    private Map<String, String> getAuthHeaders(Creds creds) {
        return ImmutableMap.of("Authorization", "Bearer " + creds.getToken(), X_UA_APPKEY, creds.getAppKey());
    }

    private byte[] getQuery(Optional<StartPosition> optional) {
        return GSON.toJson(new StreamRequestPayload(this.descriptor.getFilters(), this.descriptor.getSubset(), optional, this.descriptor.offsetUpdatesEnabled())).getBytes(StandardCharsets.UTF_8);
    }

    private String getAppKey() {
        return this.descriptor.getCreds().getAppKey();
    }
}
