/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.HandlerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConnectionHandler {
    private static final AtomicReferenceFieldUpdater<ConnectionHandler, ClientCnx> CLIENT_CNX_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ConnectionHandler.class, ClientCnx.class, "clientCnx");
    private volatile ClientCnx clientCnx = null;
    protected final HandlerState state;
    protected final Backoff backoff;
    private static final AtomicLongFieldUpdater<ConnectionHandler> EPOCH_UPDATER = AtomicLongFieldUpdater.newUpdater(ConnectionHandler.class, "epoch");
    private volatile long epoch = -1L;
    protected volatile long lastConnectionClosedTimestamp = 0L;
    private final AtomicBoolean duringConnect = new AtomicBoolean(false);
    protected Connection connection;
    private static final Logger log = LoggerFactory.getLogger(ConnectionHandler.class);

    protected ConnectionHandler(HandlerState state, Backoff backoff, Connection connection) {
        this.state = state;
        this.connection = connection;
        this.backoff = backoff;
        CLIENT_CNX_UPDATER.set(this, null);
    }

    protected void grabCnx() {
        if (!this.duringConnect.compareAndSet(false, true)) {
            log.info("[{}] [{}] Skip grabbing the connection since there is a pending connection", (Object)this.state.topic, (Object)this.state.getHandlerName());
            return;
        }
        if (CLIENT_CNX_UPDATER.get(this) != null) {
            log.warn("[{}] [{}] Client cnx already set, ignoring reconnection request", (Object)this.state.topic, (Object)this.state.getHandlerName());
            return;
        }
        if (!this.isValidStateForReconnection()) {
            log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.state.topic, this.state.getHandlerName(), this.state.getState()});
            return;
        }
        try {
            CompletableFuture<ClientCnx> cnxFuture;
            if (this.state.redirectedClusterURI != null) {
                InetSocketAddress address = InetSocketAddress.createUnresolved(this.state.redirectedClusterURI.getHost(), this.state.redirectedClusterURI.getPort());
                cnxFuture = this.state.client.getConnection(address, address);
            } else {
                cnxFuture = this.state.topic == null ? this.state.client.getConnectionToServiceUrl() : this.state.client.getConnection(this.state.topic);
            }
            ((CompletableFuture)((CompletableFuture)cnxFuture.thenCompose(cnx -> this.connection.connectionOpened((ClientCnx)cnx))).thenAccept(__ -> this.duringConnect.set(false))).exceptionally(this::handleConnectionError);
        }
        catch (Throwable t2) {
            log.warn("[{}] [{}] Exception thrown while getting connection: ", new Object[]{this.state.topic, this.state.getHandlerName(), t2});
            this.reconnectLater(t2);
        }
    }

    private Void handleConnectionError(Throwable exception) {
        try {
            log.warn("[{}] [{}] Error connecting to broker: {}", new Object[]{this.state.topic, this.state.getHandlerName(), exception.getMessage()});
            if (exception instanceof PulsarClientException) {
                this.connection.connectionFailed((PulsarClientException)exception);
            } else if (exception.getCause() instanceof PulsarClientException) {
                this.connection.connectionFailed((PulsarClientException)exception.getCause());
            } else {
                this.connection.connectionFailed(new PulsarClientException(exception));
            }
        }
        catch (Throwable throwable) {
            log.error("[{}] [{}] Unexpected exception after the connection", new Object[]{this.state.topic, this.state.getHandlerName(), throwable});
        }
        this.reconnectLater(exception);
        return null;
    }

    void reconnectLater(Throwable exception) {
        CLIENT_CNX_UPDATER.set(this, null);
        this.duringConnect.set(false);
        if (!this.isValidStateForReconnection()) {
            log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.state.topic, this.state.getHandlerName(), this.state.getState()});
            return;
        }
        long delayMs = this.backoff.next();
        log.warn("[{}] [{}] Could not get connection to broker: {} -- Will try again in {} s", new Object[]{this.state.topic, this.state.getHandlerName(), exception.getMessage(), (double)delayMs / 1000.0});
        if (this.state.changeToConnecting()) {
            this.state.client.timer().newTimeout(timeout -> {
                log.info("[{}] [{}] Reconnecting after connection was closed", (Object)this.state.topic, (Object)this.state.getHandlerName());
                this.grabCnx();
            }, delayMs, TimeUnit.MILLISECONDS);
        } else {
            log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.state.topic, this.state.getHandlerName(), this.state.getState()});
        }
    }

    public void connectionClosed(ClientCnx cnx) {
        this.lastConnectionClosedTimestamp = System.currentTimeMillis();
        this.duringConnect.set(false);
        this.state.client.getCnxPool().releaseConnection(cnx);
        if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
            if (!this.isValidStateForReconnection()) {
                log.info("[{}] [{}] Ignoring reconnection request (state: {})", new Object[]{this.state.topic, this.state.getHandlerName(), this.state.getState()});
                return;
            }
            long delayMs = this.backoff.next();
            this.state.setState(HandlerState.State.Connecting);
            log.info("[{}] [{}] Closed connection {} -- Will try again in {} s", new Object[]{this.state.topic, this.state.getHandlerName(), cnx.channel(), (double)delayMs / 1000.0});
            this.state.client.timer().newTimeout(timeout -> {
                log.info("[{}] [{}] Reconnecting after timeout", (Object)this.state.topic, (Object)this.state.getHandlerName());
                this.grabCnx();
            }, delayMs, TimeUnit.MILLISECONDS);
        }
    }

    protected void resetBackoff() {
        this.backoff.reset();
    }

    public ClientCnx cnx() {
        return CLIENT_CNX_UPDATER.get(this);
    }

    protected void setClientCnx(ClientCnx clientCnx) {
        CLIENT_CNX_UPDATER.set(this, clientCnx);
    }

    protected long switchClientCnx(ClientCnx clientCnx) {
        this.setClientCnx(clientCnx);
        return EPOCH_UPDATER.incrementAndGet(this);
    }

    private boolean isValidStateForReconnection() {
        HandlerState.State state = this.state.getState();
        switch (state) {
            case Uninitialized: 
            case Connecting: 
            case RegisteringSchema: 
            case Ready: {
                return true;
            }
            case Closing: 
            case Closed: 
            case Failed: 
            case ProducerFenced: 
            case Terminated: {
                return false;
            }
        }
        return false;
    }

    public long getEpoch() {
        return this.epoch;
    }

    static interface Connection {
        public CompletableFuture<Void> connectionOpened(ClientCnx var1);

        default public void connectionFailed(PulsarClientException e) {
        }
    }
}

