package org.springframework.cloud.stream.app.twitterstream.source;

import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.social.twitter.api.impl.TwitterTemplate;
import org.springframework.util.StringUtils;
import org.springframework.web.client.DefaultResponseErrorHandler;
import org.springframework.web.client.HttpStatusCodeException;
import org.springframework.web.client.RestTemplate;

/* loaded from: input_file:org/springframework/cloud/stream/app/twitterstream/source/AbstractTwitterInboundChannelAdapter.class */
public abstract class AbstractTwitterInboundChannelAdapter extends MessageProducerSupport {
    private static final AtomicInteger instance = new AtomicInteger();
    private final TwitterTemplate twitter;
    private final ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    private final Object monitor = new Object();
    private final AtomicBoolean running = new AtomicBoolean(false);
    private final AtomicInteger linearBackOff = new AtomicInteger(250);
    private final AtomicInteger httpErrorBackOff = new AtomicInteger(5000);
    private final AtomicInteger rateLimitBackOff = new AtomicInteger(60000);

    /* loaded from: input_file:org/springframework/cloud/stream/app/twitterstream/source/AbstractTwitterInboundChannelAdapter$StreamReadingTask.class */
    protected class StreamReadingTask implements Runnable {
        protected StreamReadingTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (AbstractTwitterInboundChannelAdapter.this.running.get()) {
                try {
                    readStream(AbstractTwitterInboundChannelAdapter.this.twitter.getRestTemplate());
                } catch (Exception e) {
                    AbstractTwitterInboundChannelAdapter.this.logger.warn("Exception while reading stream.", e);
                    AbstractTwitterInboundChannelAdapter.this.waitLinearBackoff();
                } catch (HttpStatusCodeException e2) {
                    if (e2.getStatusCode() == HttpStatus.UNAUTHORIZED) {
                        AbstractTwitterInboundChannelAdapter.this.logger.error("Twitter authentication failed: " + e2.getMessage(), e2);
                        AbstractTwitterInboundChannelAdapter.this.running.set(false);
                    } else if (420 == e2.getStatusCode().value()) {
                        AbstractTwitterInboundChannelAdapter.this.waitRateLimitBackoff();
                    } else {
                        AbstractTwitterInboundChannelAdapter.this.waitHttpErrorBackoff(e2);
                    }
                }
            }
        }

        private void readStream(RestTemplate restTemplate) {
            restTemplate.execute(AbstractTwitterInboundChannelAdapter.this.buildUri(), HttpMethod.GET, clientHttpRequest -> {
            }, clientHttpResponse -> {
                LineNumberReader lineNumberReader = null;
                try {
                    lineNumberReader = new LineNumberReader(new InputStreamReader(clientHttpResponse.getBody()));
                    AbstractTwitterInboundChannelAdapter.this.resetBackOffs();
                    while (AbstractTwitterInboundChannelAdapter.this.running.get()) {
                        String readLine = lineNumberReader.readLine();
                        if (StringUtils.hasText(readLine)) {
                            AbstractTwitterInboundChannelAdapter.this.doSendLine(readLine);
                        }
                    }
                    if (lineNumberReader == null) {
                        return null;
                    }
                    lineNumberReader.close();
                    return null;
                } catch (Throwable th) {
                    if (lineNumberReader != null) {
                        lineNumberReader.close();
                    }
                    throw th;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTwitterInboundChannelAdapter(TwitterTemplate twitterTemplate) {
        this.twitter = twitterTemplate;
        this.twitter.getRestTemplate().setErrorHandler(new DefaultResponseErrorHandler());
        setPhase(Integer.MAX_VALUE);
    }

    public void setReadTimeout(int i) {
        SimpleClientHttpRequestFactory requestFactory = getRequestFactory();
        if (requestFactory instanceof SimpleClientHttpRequestFactory) {
            requestFactory.setReadTimeout(i);
        } else {
            ((HttpComponentsClientHttpRequestFactory) requestFactory).setReadTimeout(i);
        }
    }

    public void setConnectTimeout(int i) {
        SimpleClientHttpRequestFactory requestFactory = getRequestFactory();
        if (requestFactory instanceof SimpleClientHttpRequestFactory) {
            requestFactory.setConnectTimeout(i);
        } else {
            ((HttpComponentsClientHttpRequestFactory) requestFactory).setConnectTimeout(i);
        }
    }

    protected void onInit() {
        this.taskExecutor.setThreadNamePrefix("twitterSource-" + instance.incrementAndGet() + "-");
        this.taskExecutor.initialize();
    }

    protected void doStart() {
        synchronized (this.monitor) {
            if (this.running.get()) {
                return;
            }
            this.running.set(true);
            this.taskExecutor.execute(new StreamReadingTask());
        }
    }

    protected void doStop() {
        this.running.set(false);
        this.taskExecutor.getThreadPoolExecutor().shutdownNow();
        try {
            if (!this.taskExecutor.getThreadPoolExecutor().awaitTermination(10L, TimeUnit.SECONDS)) {
                this.logger.error("Reader task failed to stop");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected abstract URI buildUri();

    protected abstract void doSendLine(String str);

    private ClientHttpRequestFactory getRequestFactory() {
        return (ClientHttpRequestFactory) new DirectFieldAccessor(this.twitter.getRestTemplate().getRequestFactory()).getPropertyValue("requestFactory");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void resetBackOffs() {
        this.linearBackOff.set(250);
        this.rateLimitBackOff.set(60000);
        this.httpErrorBackOff.set(5000);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitLinearBackoff() {
        int i = this.linearBackOff.get();
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Exception while reading stream, waiting for " + i + " ms before restarting");
        }
        wait(i);
        if (i < 16000) {
            this.linearBackOff.set(i + 250);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitRateLimitBackoff() {
        int i = this.rateLimitBackOff.get();
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Rate limit error, waiting for " + (i / 1000) + " seconds before restarting");
        }
        wait(i);
        this.rateLimitBackOff.set(i * 2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void waitHttpErrorBackoff(HttpStatusCodeException httpStatusCodeException) {
        int i = this.httpErrorBackOff.get();
        if (this.logger.isWarnEnabled()) {
            this.logger.warn("Http error, waiting for " + (i / 1000) + " seconds before restarting", httpStatusCodeException);
        }
        wait(i);
        if (i < 320000) {
            this.httpErrorBackOff.set(i * 2);
        }
    }

    protected void wait(int i) {
        try {
            Thread.sleep(i);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            if (this.running.get()) {
                throw new IllegalStateException(e);
            }
        }
    }
}
