package org.apache.druid.server.coordination;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.InputStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.druid.concurrent.LifecycleLock;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.server.coordination.ChangeRequestHistory;
import org.apache.druid.server.coordinator.BytesAccumulatingResponseHandler;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/druid/server/coordination/ChangeRequestHttpSyncer.class */
public class ChangeRequestHttpSyncer<T> {
    public static final long HTTP_TIMEOUT_EXTRA_MS = 5000;
    private final ObjectMapper smileMapper;
    private final HttpClient httpClient;
    private final ScheduledExecutorService executor;
    private final URL baseServerURL;
    private final String baseRequestPath;
    private final TypeReference<ChangeRequestsSnapshot<T>> responseTypeReferences;
    private final long serverTimeoutMS;
    private final long serverHttpTimeout;
    private final Duration maxUnstableDuration;
    private final Duration maxDelayBetweenSyncRequests;
    private final Duration maxDurationToWaitForSync;
    private final Listener<T> listener;
    private final String logIdentity;
    private static final EmittingLogger log = new EmittingLogger(ChangeRequestHttpSyncer.class);
    private static final long MAX_RETRY_BACKOFF = TimeUnit.MINUTES.toMillis(2);
    private final CountDownLatch initializationLatch = new CountDownLatch(1);
    private final LifecycleLock startStopLock = new LifecycleLock();
    private int consecutiveFailedAttemptCount = 0;
    private final Stopwatch sinceSyncerStart = Stopwatch.createUnstarted();
    private final Stopwatch sinceLastSyncRequest = Stopwatch.createUnstarted();
    private final Stopwatch sinceLastSyncSuccess = Stopwatch.createUnstarted();
    private final Stopwatch sinceUnstable = Stopwatch.createUnstarted();

    @Nullable
    private ChangeRequestHistory.Counter counter = null;

    /* loaded from: input_file:org/apache/druid/server/coordination/ChangeRequestHttpSyncer$Listener.class */
    public interface Listener<T> {
        void fullSync(List<T> list);

        void deltaSync(List<T> list);
    }

    public ChangeRequestHttpSyncer(ObjectMapper objectMapper, HttpClient httpClient, ScheduledExecutorService scheduledExecutorService, URL url, String str, TypeReference<ChangeRequestsSnapshot<T>> typeReference, long j, long j2, Listener<T> listener) {
        this.smileMapper = objectMapper;
        this.httpClient = httpClient;
        this.executor = scheduledExecutorService;
        this.baseServerURL = url;
        this.baseRequestPath = str;
        this.responseTypeReferences = typeReference;
        this.serverTimeoutMS = j;
        this.serverHttpTimeout = j + HTTP_TIMEOUT_EXTRA_MS;
        this.listener = listener;
        this.logIdentity = StringUtils.format("%s_%d", url, Long.valueOf(System.currentTimeMillis()));
        this.maxDurationToWaitForSync = Duration.millis(3 * this.serverHttpTimeout);
        this.maxDelayBetweenSyncRequests = Duration.millis((3 * this.serverHttpTimeout) + MAX_RETRY_BACKOFF);
        this.maxUnstableDuration = Duration.millis(j2);
    }

    public void start() {
        synchronized (this.startStopLock) {
            if (!this.startStopLock.canStart()) {
                throw new ISE("Could not start sync for server[%s].", this.logIdentity);
            }
            try {
                log.info("Starting sync for server[%s].", this.logIdentity);
                this.startStopLock.started();
                this.startStopLock.exitStart();
                this.sinceSyncerStart.restart();
                addNextSyncToWorkQueue();
            } catch (Throwable th) {
                this.startStopLock.exitStart();
                throw th;
            }
        }
    }

    public void stop() {
        synchronized (this.startStopLock) {
            if (!this.startStopLock.canStop()) {
                throw new ISE("Could not stop sync for server[%s].", this.logIdentity);
            }
            try {
                log.info("Stopping sync for server[%s].", this.logIdentity);
                this.startStopLock.exitStop();
                log.info("Stopped sync for server[%s].", this.logIdentity);
            } catch (Throwable th) {
                this.startStopLock.exitStop();
                throw th;
            }
        }
    }

    public boolean awaitInitialization() throws InterruptedException {
        return this.initializationLatch.await(this.maxDurationToWaitForSync.getMillis(), TimeUnit.MILLISECONDS);
    }

    public boolean isInitialized() {
        return this.initializationLatch.getCount() == 0;
    }

    public Map<String, Object> getDebugInfo() {
        return ImmutableMap.of("millisSinceLastRequest", (Boolean) Long.valueOf(this.sinceLastSyncRequest.millisElapsed()), "millisSinceLastSuccess", (Boolean) Long.valueOf(this.sinceLastSyncSuccess.millisElapsed()), "consecutiveFailedAttemptCount", (Boolean) Integer.valueOf(this.consecutiveFailedAttemptCount), "syncScheduled", Boolean.valueOf(this.startStopLock.isStarted()));
    }

    public boolean needsReset() {
        return this.sinceLastSyncRequest.isRunning() ? this.sinceLastSyncRequest.hasElapsed(this.maxDelayBetweenSyncRequests) : this.sinceSyncerStart.hasElapsed(this.maxDelayBetweenSyncRequests);
    }

    public long getUnstableTimeMillis() {
        if (this.consecutiveFailedAttemptCount <= 0) {
            return 0L;
        }
        return this.sinceUnstable.millisElapsed();
    }

    public boolean isSyncedSuccessfully() {
        if (this.consecutiveFailedAttemptCount > 0) {
            return false;
        }
        return this.sinceLastSyncSuccess.hasNotElapsed(this.maxDurationToWaitForSync);
    }

    private void sync() {
        if (!this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
            log.info("Skipping sync for server[%s] as syncer has not started yet.", this.logIdentity);
            return;
        }
        this.sinceLastSyncRequest.restart();
        try {
            String requestString = getRequestString();
            final BytesAccumulatingResponseHandler bytesAccumulatingResponseHandler = new BytesAccumulatingResponseHandler();
            log.debug("Sending sync request to server[%s]", this.logIdentity);
            ListenableFuture go = this.httpClient.go(new Request(HttpMethod.GET, new URL(this.baseServerURL, requestString)).addHeader("Accept", SmileMediaTypes.APPLICATION_JACKSON_SMILE).addHeader("Content-Type", SmileMediaTypes.APPLICATION_JACKSON_SMILE), bytesAccumulatingResponseHandler, Duration.millis(this.serverHttpTimeout));
            log.debug("Sent sync request to [%s]", this.logIdentity);
            Futures.addCallback(go, new FutureCallback<InputStream>() { // from class: org.apache.druid.server.coordination.ChangeRequestHttpSyncer.1
                @Override // com.google.common.util.concurrent.FutureCallback
                public void onSuccess(InputStream inputStream) {
                    int status;
                    synchronized (ChangeRequestHttpSyncer.this.startStopLock) {
                        try {
                            if (!ChangeRequestHttpSyncer.this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                                ChangeRequestHttpSyncer.log.info("Not handling response for server[%s] as syncer has not started yet.", ChangeRequestHttpSyncer.this.logIdentity);
                                return;
                            }
                            try {
                                status = bytesAccumulatingResponseHandler.getStatus();
                            } catch (Exception e) {
                                ChangeRequestHttpSyncer.this.markServerUnstableAndAlert(e, "Processing Response");
                                ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                            }
                            if (status == 204) {
                                ChangeRequestHttpSyncer.log.debug("Received NO CONTENT from server[%s]", ChangeRequestHttpSyncer.this.logIdentity);
                                ChangeRequestHttpSyncer.this.sinceLastSyncSuccess.restart();
                                ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                                return;
                            }
                            if (status != 200) {
                                handleFailure(new ISE("Received sync response [%d]", Integer.valueOf(status)));
                                ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                                return;
                            }
                            ChangeRequestHttpSyncer.log.debug("Received sync response from server[%s]", ChangeRequestHttpSyncer.this.logIdentity);
                            ChangeRequestsSnapshot changeRequestsSnapshot = (ChangeRequestsSnapshot) ChangeRequestHttpSyncer.this.smileMapper.readValue(inputStream, ChangeRequestHttpSyncer.this.responseTypeReferences);
                            ChangeRequestHttpSyncer.log.debug("Finished reading sync response from server[%s]", ChangeRequestHttpSyncer.this.logIdentity);
                            if (changeRequestsSnapshot.isResetCounter()) {
                                ChangeRequestHttpSyncer.log.info("Server[%s] requested resetCounter for reason[%s].", ChangeRequestHttpSyncer.this.logIdentity, changeRequestsSnapshot.getResetCause());
                                ChangeRequestHttpSyncer.this.counter = null;
                                ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                                return;
                            }
                            if (ChangeRequestHttpSyncer.this.counter == null) {
                                ChangeRequestHttpSyncer.this.listener.fullSync(changeRequestsSnapshot.getRequests());
                            } else {
                                ChangeRequestHttpSyncer.this.listener.deltaSync(changeRequestsSnapshot.getRequests());
                            }
                            ChangeRequestHttpSyncer.this.counter = changeRequestsSnapshot.getCounter();
                            if (ChangeRequestHttpSyncer.this.initializationLatch.getCount() > 0) {
                                ChangeRequestHttpSyncer.this.initializationLatch.countDown();
                                ChangeRequestHttpSyncer.log.info("Server[%s] synced successfully for the first time.", ChangeRequestHttpSyncer.this.logIdentity);
                            }
                            if (ChangeRequestHttpSyncer.this.consecutiveFailedAttemptCount > 0) {
                                ChangeRequestHttpSyncer.this.consecutiveFailedAttemptCount = 0;
                                ChangeRequestHttpSyncer.this.sinceUnstable.reset();
                                ChangeRequestHttpSyncer.log.info("Server[%s] synced successfully.", ChangeRequestHttpSyncer.this.logIdentity);
                            }
                            ChangeRequestHttpSyncer.this.sinceLastSyncSuccess.restart();
                            ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                        } catch (Throwable th) {
                            ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                            throw th;
                        }
                    }
                }

                @Override // com.google.common.util.concurrent.FutureCallback
                public void onFailure(Throwable th) {
                    synchronized (ChangeRequestHttpSyncer.this.startStopLock) {
                        if (!ChangeRequestHttpSyncer.this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                            ChangeRequestHttpSyncer.log.info("Not handling sync failure for server[%s] as syncer has not started yet.", ChangeRequestHttpSyncer.this.logIdentity);
                            return;
                        }
                        try {
                            handleFailure(th);
                            ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                        } catch (Throwable th2) {
                            ChangeRequestHttpSyncer.this.addNextSyncToWorkQueue();
                            throw th2;
                        }
                    }
                }

                private void handleFailure(Throwable th) {
                    ChangeRequestHttpSyncer.this.markServerUnstableAndAlert(th, StringUtils.format("Handling response with code[%d], description[%s]", Integer.valueOf(bytesAccumulatingResponseHandler.getStatus()), bytesAccumulatingResponseHandler.getDescription()));
                }
            }, this.executor);
        } catch (Throwable th) {
            try {
                markServerUnstableAndAlert(th, "Sending Request");
                addNextSyncToWorkQueue();
            } catch (Throwable th2) {
                addNextSyncToWorkQueue();
                throw th2;
            }
        }
    }

    private String getRequestString() {
        return this.counter != null ? StringUtils.format("%s?counter=%s&hash=%s&timeout=%s", this.baseRequestPath, Long.valueOf(this.counter.getCounter()), Long.valueOf(this.counter.getHash()), Long.valueOf(this.serverTimeoutMS)) : StringUtils.format("%s?counter=-1&timeout=%s", this.baseRequestPath, Long.valueOf(this.serverTimeoutMS));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addNextSyncToWorkQueue() {
        synchronized (this.startStopLock) {
            if (!this.startStopLock.awaitStarted(1L, TimeUnit.MILLISECONDS)) {
                log.info("Not scheduling sync for server[%s]. Instance stopped.", this.logIdentity);
                return;
            }
            try {
                if (this.consecutiveFailedAttemptCount > 0) {
                    long min = Math.min(MAX_RETRY_BACKOFF, RetryUtils.nextRetrySleepMillis(this.consecutiveFailedAttemptCount));
                    log.info("Scheduling next sync for server[%s] in [%d] millis.", this.logIdentity, Long.valueOf(min));
                    this.executor.schedule(this::sync, min, TimeUnit.MILLISECONDS);
                } else {
                    this.executor.execute(this::sync);
                }
            } catch (Throwable th) {
                if (this.executor.isShutdown()) {
                    log.warn(th, "Could not schedule sync for server[%s] because executor is stopped.", this.logIdentity);
                } else {
                    log.warn(th, "Could not schedule sync for server [%s]. This syncer will be reset automatically. If the issue persists, try restarting this Druid service.", this.logIdentity);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void markServerUnstableAndAlert(Throwable th, String str) {
        int i = this.consecutiveFailedAttemptCount;
        this.consecutiveFailedAttemptCount = i + 1;
        if (i == 0) {
            this.sinceUnstable.restart();
        }
        String format = StringUtils.format("Sync failed for server[%s] while [%s]. Failed [%d] times in the last [%d] seconds.", this.baseServerURL, str, Integer.valueOf(this.consecutiveFailedAttemptCount), Long.valueOf(getUnstableTimeMillis() / 1000));
        if (this.sinceUnstable.hasElapsed(this.maxUnstableDuration)) {
            log.noStackTrace().makeAlert(th, StringUtils.format("%s. Try restarting the Druid process on server[%s].", format, this.baseServerURL), new Object[0]).emit();
        } else if (log.isDebugEnabled()) {
            log.debug(th, format, new Object[0]);
        } else {
            log.noStackTrace().info(th, format, new Object[0]);
        }
    }

    @VisibleForTesting
    public boolean isExecutorShutdown() {
        return this.executor.isShutdown();
    }
}
