package com.yahoo.documentapi.messagebus;

import com.yahoo.document.BucketId;
import com.yahoo.document.BucketIdFactory;
import com.yahoo.document.select.parser.ParseException;
import com.yahoo.documentapi.AckToken;
import com.yahoo.documentapi.ProgressToken;
import com.yahoo.documentapi.VisitorControlHandler;
import com.yahoo.documentapi.VisitorDataQueue;
import com.yahoo.documentapi.VisitorIterator;
import com.yahoo.documentapi.VisitorParameters;
import com.yahoo.documentapi.VisitorResponse;
import com.yahoo.documentapi.VisitorSession;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorMessage;
import com.yahoo.documentapi.messagebus.protocol.CreateVisitorReply;
import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.DocumentReply;
import com.yahoo.documentapi.messagebus.protocol.VisitorInfoMessage;
import com.yahoo.documentapi.messagebus.protocol.WrongDistributionReply;
import com.yahoo.messagebus.DestinationSession;
import com.yahoo.messagebus.DestinationSessionParams;
import com.yahoo.messagebus.DynamicThrottlePolicy;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Reply;
import com.yahoo.messagebus.ReplyHandler;
import com.yahoo.messagebus.Result;
import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.Trace;
import com.yahoo.messagebus.routing.RoutingTable;
import com.yahoo.protect.Process;
import com.yahoo.vdslib.VisitorStatistics;
import com.yahoo.vdslib.state.ClusterState;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession.class */
public final class MessageBusVisitorSession implements VisitorSession {
    private static final Logger log;
    private static final AtomicLong sessionCounter;
    private final VisitorParameters params;
    private final Sender sender;
    private final Receiver receiver;
    private final AsyncTaskExecutor taskExecutor;
    private final VisitingProgress progress;
    private final VisitorStatistics statistics;
    private final String sessionName;
    private final String dataDestination;
    private final Clock clock;
    private final Object replyTrackingMonitor;
    private StateDescription state;
    private long visitorCounter;
    private long startTimeNanos;
    private long scheduledHandleReplyTasks;
    private boolean scheduledSendCreateVisitors;
    private boolean done;
    private boolean destroying;
    private final Object completionMonitor;
    private final Trace trace;
    private int pendingMessageCount;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$AsyncTaskExecutor.class */
    public interface AsyncTaskExecutor {
        void submitTask(Runnable runnable);

        void scheduleTask(Runnable runnable, long j, TimeUnit timeUnit);
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$Clock.class */
    public interface Clock {
        long monotonicNanoTime();
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$HandleMessageTask.class */
    private class HandleMessageTask implements Runnable {
        private final Message message;

        private HandleMessageTask(Message message) {
            this.message = message;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (MessageBusVisitorSession.log.isLoggable(Level.FINE)) {
                MessageBusVisitorSession.log.log(Level.FINE, "Visitor session " + MessageBusVisitorSession.this.sessionName + ": Received message " + this.message);
            }
            try {
                if (this.message instanceof VisitorInfoMessage) {
                    MessageBusVisitorSession.this.handleVisitorInfoMessage((VisitorInfoMessage) this.message);
                } else {
                    MessageBusVisitorSession.this.handleDocumentMessage((DocumentMessage) this.message);
                }
            } catch (Throwable th) {
                Process.logAndDie("Caught unhandled error when processing message", th);
            }
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$HandleReplyTask.class */
    private class HandleReplyTask implements Runnable {
        private final Reply reply;
        static final /* synthetic */ boolean $assertionsDisabled;

        HandleReplyTask(Reply reply) {
            this.reply = reply;
        }

        /* JADX WARN: Finally extract failed */
        @Override // java.lang.Runnable
        public void run() {
            synchronized (MessageBusVisitorSession.this.progress.getToken()) {
                MessageBusVisitorSession.this.decrementScheduleHandleReplyTasks();
                try {
                    try {
                    } catch (Throwable th) {
                        MessageBusVisitorSession.this.continueVisiting();
                        throw th;
                    }
                } catch (Exception e) {
                    String str = "Got exception of type " + e.getClass().getName() + " with message '" + e.getMessage() + "' while processing reply in visitor session";
                    MessageBusVisitorSession.log.log(Level.WARNING, str, (Throwable) e);
                    MessageBusVisitorSession.this.transitionTo(new StateDescription(State.FAILED, str));
                    MessageBusVisitorSession.this.continueVisiting();
                } catch (Throwable th2) {
                    Process.logAndDie("Caught unhandled error when running reply task", th2);
                    MessageBusVisitorSession.this.continueVisiting();
                }
                if (!$assertionsDisabled && MessageBusVisitorSession.this.pendingMessageCount <= 0) {
                    throw new AssertionError();
                }
                MessageBusVisitorSession.this.pendingMessageCount--;
                if (this.reply.hasErrors()) {
                    MessageBusVisitorSession.this.handleErrorReply(this.reply);
                } else if (this.reply instanceof CreateVisitorReply) {
                    MessageBusVisitorSession.this.handleCreateVisitorReply((CreateVisitorReply) this.reply);
                } else {
                    String str2 = "Received reply we do not know how to handle: " + this.reply.getClass().getName();
                    MessageBusVisitorSession.log.log(Level.SEVERE, str2);
                    MessageBusVisitorSession.this.transitionTo(new StateDescription(State.FAILED, str2));
                }
                MessageBusVisitorSession.this.continueVisiting();
            }
        }

        static {
            $assertionsDisabled = !MessageBusVisitorSession.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$MessageBusReceiver.class */
    public static class MessageBusReceiver implements Receiver {
        private final DestinationSession destinationSession;

        public MessageBusReceiver(DestinationSession destinationSession) {
            this.destinationSession = destinationSession;
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Receiver
        public void reply(Reply reply) {
            this.destinationSession.reply(reply);
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Receiver
        public void destroy() {
            this.destinationSession.destroy();
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Receiver
        public String getConnectionSpec() {
            return this.destinationSession.getConnectionSpec();
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$MessageBusReceiverFactory.class */
    public static class MessageBusReceiverFactory implements ReceiverFactory {
        private final MessageBus messageBus;

        public MessageBusReceiverFactory(MessageBus messageBus) {
            this.messageBus = messageBus;
        }

        private DestinationSessionParams createDestinationParams(MessageHandler messageHandler, String str) {
            DestinationSessionParams destinationSessionParams = new DestinationSessionParams();
            destinationSessionParams.setName(str);
            destinationSessionParams.setBroadcastName(false);
            destinationSessionParams.setMessageHandler(messageHandler);
            return destinationSessionParams;
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.ReceiverFactory
        public Receiver createReceiver(MessageHandler messageHandler, String str) {
            return new MessageBusReceiver(this.messageBus.createDestinationSession(createDestinationParams(messageHandler, str)));
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$MessageBusSender.class */
    public static class MessageBusSender implements Sender {
        private final SourceSession sourceSession;

        public MessageBusSender(SourceSession sourceSession) {
            this.sourceSession = sourceSession;
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Sender
        public Result send(Message message) {
            return this.sourceSession.send(message);
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Sender
        public int getPendingCount() {
            return this.sourceSession.getPendingCount();
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Sender
        public void destroy() {
            this.sourceSession.destroy();
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$MessageBusSenderFactory.class */
    public static class MessageBusSenderFactory implements SenderFactory {
        private final MessageBus messageBus;

        public MessageBusSenderFactory(MessageBus messageBus) {
            this.messageBus = messageBus;
        }

        private SourceSessionParams createSourceSessionParams(VisitorParameters visitorParameters) {
            SourceSessionParams sourceSessionParams = new SourceSessionParams();
            if (visitorParameters.getThrottlePolicy() != null) {
                sourceSessionParams.setThrottlePolicy(visitorParameters.getThrottlePolicy());
            } else {
                sourceSessionParams.setThrottlePolicy(new DynamicThrottlePolicy());
            }
            return sourceSessionParams;
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.SenderFactory
        public Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters) {
            return new MessageBusSender(this.messageBus.createSourceSession(replyHandler, createSourceSessionParams(visitorParameters)));
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$RealClock.class */
    public static class RealClock implements Clock {
        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.Clock
        public long monotonicNanoTime() {
            return System.nanoTime();
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$Receiver.class */
    public interface Receiver {
        void reply(Reply reply);

        void destroy();

        String getConnectionSpec();
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$ReceiverFactory.class */
    public interface ReceiverFactory {
        Receiver createReceiver(MessageHandler messageHandler, String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$SendCreateVisitorsTask.class */
    public class SendCreateVisitorsTask implements Runnable {
        private final long messageTimeoutMs;

        SendCreateVisitorsTask(long j) {
            this.messageTimeoutMs = j;
        }

        private String getNextVisitorId() {
            StringBuilder sb = new StringBuilder();
            MessageBusVisitorSession.this.visitorCounter++;
            sb.append(MessageBusVisitorSession.this.sessionName).append('-').append(MessageBusVisitorSession.this.visitorCounter);
            return sb.toString();
        }

        private CreateVisitorMessage createMessage(VisitorIterator.BucketProgress bucketProgress) {
            CreateVisitorMessage createVisitorMessage = new CreateVisitorMessage(MessageBusVisitorSession.this.params.getVisitorLibrary(), getNextVisitorId(), MessageBusVisitorSession.this.receiver.getConnectionSpec(), MessageBusVisitorSession.this.dataDestination);
            createVisitorMessage.getTrace().setLevel(MessageBusVisitorSession.this.params.getTraceLevel());
            createVisitorMessage.setTimeRemaining(this.messageTimeoutMs);
            createVisitorMessage.setBuckets(List.of(bucketProgress.getSuperbucket(), bucketProgress.getProgress()));
            createVisitorMessage.setDocumentSelection(MessageBusVisitorSession.this.params.getDocumentSelection());
            createVisitorMessage.setBucketSpace(MessageBusVisitorSession.this.params.getBucketSpace());
            createVisitorMessage.setFromTimestamp(MessageBusVisitorSession.this.params.getFromTimestamp());
            createVisitorMessage.setToTimestamp(MessageBusVisitorSession.this.params.getToTimestamp());
            createVisitorMessage.setMaxPendingReplyCount(MessageBusVisitorSession.this.params.getMaxPending());
            createVisitorMessage.setFieldSet(MessageBusVisitorSession.this.params.fieldSet());
            createVisitorMessage.setVisitInconsistentBuckets(MessageBusVisitorSession.this.params.visitInconsistentBuckets());
            createVisitorMessage.setVisitRemoves(MessageBusVisitorSession.this.params.visitRemoves());
            createVisitorMessage.setParameters(MessageBusVisitorSession.this.params.getLibraryParameters());
            createVisitorMessage.setRoute(MessageBusVisitorSession.this.params.getRoute());
            createVisitorMessage.setMaxBucketsPerVisitor(MessageBusVisitorSession.this.params.getMaxBucketsPerVisitor());
            createVisitorMessage.setPriority(MessageBusVisitorSession.this.params.getPriority());
            createVisitorMessage.setRetryEnabled(false);
            return createVisitorMessage;
        }

        /* JADX WARN: Code restructure failed: missing block: B:19:0x0083, code lost:
        
            r6.this$0.progress.getIterator().update(r0.getSuperbucket(), r0.getProgress());
         */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 239
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.yahoo.documentapi.messagebus.MessageBusVisitorSession.SendCreateVisitorsTask.run():void");
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$Sender.class */
    public interface Sender {
        Result send(Message message);

        int getPendingCount();

        void destroy();
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$SenderFactory.class */
    public interface SenderFactory {
        Sender createSender(ReplyHandler replyHandler, VisitorParameters visitorParameters);
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$State.class */
    public enum State {
        NOT_STARTED(false),
        WORKING(false),
        COMPLETED(false),
        ABORTED(true),
        FAILED(true),
        TIMED_OUT(true);

        private final boolean failure;

        State(boolean z) {
            this.failure = z;
        }

        public boolean isFailure() {
            return this.failure;
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$StateDescription.class */
    public static class StateDescription {
        private final State state;
        private final String description;

        public StateDescription(State state, String str) {
            this.state = state;
            this.description = str;
        }

        public StateDescription(State state) {
            this.state = state;
            this.description = "";
        }

        public State getState() {
            return this.state;
        }

        public String getDescription() {
            return this.description;
        }

        VisitorControlHandler.CompletionCode toCompletionCode() {
            switch (this.state) {
                case COMPLETED:
                    return VisitorControlHandler.CompletionCode.SUCCESS;
                case ABORTED:
                    return VisitorControlHandler.CompletionCode.ABORTED;
                case FAILED:
                    return VisitorControlHandler.CompletionCode.FAILURE;
                case TIMED_OUT:
                    return VisitorControlHandler.CompletionCode.TIMEOUT;
                default:
                    throw new IllegalStateException("Current state did not have a valid value: " + this.state);
            }
        }

        public boolean failed() {
            return this.state.isFailure();
        }

        public String toString() {
            return this.state + ": " + this.description;
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$ThreadAsyncTaskExecutor.class */
    public static class ThreadAsyncTaskExecutor implements AsyncTaskExecutor {
        private final ScheduledExecutorService executor;

        public ThreadAsyncTaskExecutor(ScheduledExecutorService scheduledExecutorService) {
            this.executor = scheduledExecutorService;
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.AsyncTaskExecutor
        public void submitTask(Runnable runnable) {
            this.executor.submit(runnable);
        }

        @Override // com.yahoo.documentapi.messagebus.MessageBusVisitorSession.AsyncTaskExecutor
        public void scheduleTask(Runnable runnable, long j, TimeUnit timeUnit) {
            this.executor.schedule(runnable, j, timeUnit);
        }
    }

    /* loaded from: input_file:com/yahoo/documentapi/messagebus/MessageBusVisitorSession$VisitingProgress.class */
    public static class VisitingProgress {
        private final VisitorIterator iterator;
        private final ProgressToken token;

        public VisitingProgress(VisitorIterator visitorIterator, ProgressToken progressToken) {
            this.iterator = visitorIterator;
            this.token = progressToken;
        }

        public VisitorIterator getIterator() {
            return this.iterator;
        }

        public ProgressToken getToken() {
            return this.token;
        }
    }

    private static long getNextSessionId() {
        return sessionCounter.incrementAndGet();
    }

    private static String createSessionName() {
        long nextSessionId = getNextSessionId();
        System.currentTimeMillis();
        return "visitor-" + nextSessionId + "-" + nextSessionId;
    }

    public MessageBusVisitorSession(VisitorParameters visitorParameters, AsyncTaskExecutor asyncTaskExecutor, SenderFactory senderFactory, ReceiverFactory receiverFactory, RoutingTable routingTable) throws ParseException {
        this(visitorParameters, asyncTaskExecutor, senderFactory, receiverFactory, routingTable, new RealClock());
    }

    public MessageBusVisitorSession(VisitorParameters visitorParameters, AsyncTaskExecutor asyncTaskExecutor, SenderFactory senderFactory, ReceiverFactory receiverFactory, RoutingTable routingTable, Clock clock) throws ParseException {
        this.sessionName = createSessionName();
        this.replyTrackingMonitor = new Object();
        this.visitorCounter = 0L;
        this.startTimeNanos = 0L;
        this.scheduledHandleReplyTasks = 0L;
        this.scheduledSendCreateVisitors = false;
        this.done = false;
        this.destroying = false;
        this.completionMonitor = new Object();
        this.pendingMessageCount = 0;
        this.params = visitorParameters;
        initializeRoute(routingTable);
        this.sender = senderFactory.createSender(createReplyHandler(), this.params);
        this.receiver = receiverFactory.createReceiver(createMessageHandler(), this.sessionName);
        this.taskExecutor = asyncTaskExecutor;
        this.progress = createVisitingProgress(this.params);
        this.statistics = new VisitorStatistics();
        this.state = new StateDescription(State.NOT_STARTED);
        this.clock = clock;
        initializeHandlers(this.params);
        this.trace = new Trace(visitorParameters.getTraceLevel());
        this.dataDestination = this.params.getLocalDataHandler() == null ? this.params.getRemoteDataHandler() : this.receiver.getConnectionSpec();
        validateSessionParameters();
        if (this.progress.getIterator().isDone()) {
            markSessionCompleted();
        }
    }

    public static MessageBusVisitorSession createForMessageBus(MessageBus messageBus, ScheduledExecutorService scheduledExecutorService, VisitorParameters visitorParameters) throws ParseException {
        return new MessageBusVisitorSession(visitorParameters, new ThreadAsyncTaskExecutor(scheduledExecutorService), new MessageBusSenderFactory(messageBus), new MessageBusReceiverFactory(messageBus), messageBus.getRoutingTable(DocumentProtocol.NAME));
    }

    private void validateSessionParameters() {
        if (this.dataDestination == null) {
            throw new IllegalStateException("No data destination specified");
        }
    }

    public void start() {
        synchronized (this.progress.getToken()) {
            this.startTimeNanos = this.clock.monotonicNanoTime();
            if (this.progress.getIterator().isDone()) {
                log.log(Level.FINE, () -> {
                    return this.sessionName + ": progress token indicates session is done before it could even start; no-op";
                });
            } else {
                transitionTo(new StateDescription(State.WORKING));
                this.taskExecutor.submitTask(new SendCreateVisitorsTask(computeBoundedMessageTimeoutMillis(0L)));
            }
        }
    }

    private void updateStateUnlessAlreadyFailed(StateDescription stateDescription) {
        if (this.state.failed()) {
            return;
        }
        this.state = stateDescription;
    }

    private StateDescription transitionTo(StateDescription stateDescription) {
        log.log(Level.FINE, () -> {
            return this.sessionName + ": attempting transition to state " + stateDescription;
        });
        switch (stateDescription.getState()) {
            case COMPLETED:
            case FAILED:
            case TIMED_OUT:
                updateStateUnlessAlreadyFailed(stateDescription);
                break;
            case ABORTED:
                this.state = stateDescription;
                break;
            case WORKING:
                if (!$assertionsDisabled && this.state.getState() != State.NOT_STARTED) {
                    throw new AssertionError();
                }
                this.state = stateDescription;
                break;
            default:
                Process.logAndDie("Invalid target transition state: " + stateDescription);
                break;
        }
        log.log(Level.FINE, () -> {
            return "Session '" + this.sessionName + "' is now in state " + this.state;
        });
        return this.state;
    }

    private boolean hasScheduledHandleReplyTask() {
        boolean z;
        synchronized (this.replyTrackingMonitor) {
            z = this.scheduledHandleReplyTasks != 0;
        }
        return z;
    }

    private void incrementScheduledHandleReplyTasks() {
        synchronized (this.replyTrackingMonitor) {
            this.scheduledHandleReplyTasks++;
        }
    }

    private void decrementScheduleHandleReplyTasks() {
        synchronized (this.replyTrackingMonitor) {
            if (!$assertionsDisabled && this.scheduledHandleReplyTasks <= 0) {
                throw new AssertionError();
            }
            this.scheduledHandleReplyTasks--;
        }
    }

    private ReplyHandler createReplyHandler() {
        return reply -> {
            try {
                incrementScheduledHandleReplyTasks();
                this.taskExecutor.submitTask(new HandleReplyTask(reply));
            } catch (RejectedExecutionException e) {
                decrementScheduleHandleReplyTasks();
                log.log(Level.WARNING, "Visitor session '" + this.sessionName + "': failed to submit reply task to executor service! Session cannot reliably continue; terminating it early.", (Throwable) e);
                synchronized (this.progress.getToken()) {
                    transitionTo(new StateDescription(State.FAILED, "Failed to submit reply task to executor service: " + e.getMessage()));
                    if (!this.done) {
                        markSessionCompleted();
                    }
                }
            }
        };
    }

    private MessageHandler createMessageHandler() {
        return message -> {
            try {
                this.taskExecutor.submitTask(new HandleMessageTask(message));
            } catch (RejectedExecutionException e) {
                DocumentReply createReply = ((DocumentMessage) message).createReply();
                message.swapState(createReply);
                createReply.addError(new Error(DocumentProtocol.ERROR_ABORTED, "Visitor session has been aborted"));
                this.receiver.reply(createReply);
            }
        };
    }

    private void initializeRoute(RoutingTable routingTable) {
        if (this.params.getRoute() == null || !this.params.getRoute().hasHops()) {
            this.params.setRoute(getClusterRoute(routingTable));
            log.log(Level.FINE, () -> {
                return "No route specified; resolved implicit storage cluster: " + this.params.getRoute().toString();
            });
        }
    }

    private String getClusterRoute(RoutingTable routingTable) throws IllegalArgumentException {
        String str = null;
        RoutingTable.RouteIterator routeIterator = routingTable.getRouteIterator();
        while (routeIterator.isValid()) {
            String name = routeIterator.getName();
            if (name.startsWith("storage/cluster.")) {
                if (str != null) {
                    throw new IllegalArgumentException("There are multiple storage clusters in your application, please specify which one to visit.");
                }
                str = name;
            }
            routeIterator.next();
        }
        if (str == null) {
            throw new IllegalArgumentException("No storage cluster found in your application.");
        }
        return str;
    }

    private void initializeHandlers(VisitorParameters visitorParameters) {
        if (visitorParameters.getLocalDataHandler() != null) {
            visitorParameters.getLocalDataHandler().reset();
            visitorParameters.getLocalDataHandler().setSession(this);
        } else if (visitorParameters.getRemoteDataHandler() == null) {
            visitorParameters.setLocalDataHandler(new VisitorDataQueue());
            visitorParameters.getLocalDataHandler().setSession(this);
        }
        if (visitorParameters.getControlHandler() != null) {
            visitorParameters.getControlHandler().reset();
        } else {
            visitorParameters.setControlHandler(new VisitorControlHandler());
        }
        visitorParameters.getControlHandler().setSession(this);
    }

    private VisitingProgress createVisitingProgress(VisitorParameters visitorParameters) throws ParseException {
        VisitorIterator createFromDocumentSelection;
        ProgressToken resumeToken = visitorParameters.getResumeToken() != null ? visitorParameters.getResumeToken() : new ProgressToken();
        if (visitorParameters.getBucketsToVisit() == null || visitorParameters.getBucketsToVisit().isEmpty()) {
            createFromDocumentSelection = VisitorIterator.createFromDocumentSelection(visitorParameters.getDocumentSelection(), new BucketIdFactory(), 1, resumeToken, visitorParameters.getSlices(), visitorParameters.getSliceId());
        } else {
            if (log.isLoggable(Level.FINE)) {
                log.log(Level.FINE, "parameters specify explicit bucket set to visit; using it rather than document selection (" + visitorParameters.getBucketsToVisit().size() + " buckets given)");
            }
            createFromDocumentSelection = VisitorIterator.createFromExplicitBucketSet(visitorParameters.getBucketsToVisit(), 1, resumeToken);
        }
        return new VisitingProgress(createFromDocumentSelection, resumeToken);
    }

    private void continueVisiting() {
        if (scheduleSendCreateVisitorsIfApplicable() || !visitingCompleted()) {
            return;
        }
        markSessionCompleted();
    }

    private void markSessionCompleted() {
        log.log(Level.FINE, () -> {
            return "Visitor session '" + this.sessionName + "' has completed";
        });
        if (this.params.getLocalDataHandler() != null) {
            this.params.getLocalDataHandler().onDone();
        }
        if (this.progress.getToken().containsFailedBuckets()) {
            transitionTo(new StateDescription(State.FAILED, this.progress.getToken().getFirstErrorMsg()));
        }
        transitionTo(new StateDescription(State.COMPLETED));
        this.params.getControlHandler().onDone(this.state.toCompletionCode(), this.state.getDescription());
        synchronized (this.completionMonitor) {
            this.done = true;
            this.completionMonitor.notifyAll();
        }
    }

    private void handleMessageProcessingException(Reply reply, Exception exc, String str) {
        int i;
        String formatProcessingException = formatProcessingException(exc, str);
        log.log(Level.SEVERE, formatIdentifyingVisitorErrorString(formatProcessingException), (Throwable) exc);
        synchronized (this.progress.getToken()) {
            if (this.params.skipBucketsOnFatalErrors()) {
                i = 251008;
            } else {
                i = 250000;
                transitionTo(new StateDescription(State.FAILED, formatProcessingException));
            }
        }
        reply.addError(new Error(i, formatProcessingException));
    }

    private String formatProcessingException(Exception exc, String str) {
        return String.format("Got exception of type %s with message '%s' while processing %s", exc.getClass().getName(), exc.getMessage(), str);
    }

    private String formatIdentifyingVisitorErrorString(String str) {
        return String.format("Visitor %s (selection '%s'): %s", this.sessionName, this.params.getDocumentSelection(), str);
    }

    private void handleVisitorInfoMessage(VisitorInfoMessage visitorInfoMessage) {
        DocumentReply createReply = visitorInfoMessage.createReply();
        visitorInfoMessage.swapState(createReply);
        if (log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Visitor session " + this.sessionName + ": Received VisitorInfo with " + visitorInfoMessage.getFinishedBuckets().size() + " finished buckets");
        }
        try {
            try {
                if (!visitorInfoMessage.getErrorMessage().isEmpty()) {
                    this.params.getControlHandler().onVisitorError(visitorInfoMessage.getErrorMessage());
                }
                synchronized (this.progress.getToken()) {
                    if (isDone()) {
                        createReply.addError(new Error(250000, "Visitor has been shut down"));
                    } else {
                        this.params.getControlHandler().onProgress(this.progress.getToken());
                    }
                }
            } catch (Exception e) {
                handleMessageProcessingException(createReply, e, "VisitorInfoMessage");
                this.receiver.reply(createReply);
            }
        } finally {
            this.receiver.reply(createReply);
        }
    }

    private void handleDocumentMessage(DocumentMessage documentMessage) {
        DocumentReply createReply = documentMessage.createReply();
        documentMessage.swapState(createReply);
        if (this.params.getLocalDataHandler() == null) {
            log.log(Level.SEVERE, this.sessionName + ": Got visitor data back to client with no local data destination.");
            createReply.addError(new Error(250000, "Visitor data with no local data destination"));
            this.receiver.reply(createReply);
        } else {
            try {
                this.params.getLocalDataHandler().onMessage(documentMessage, new AckToken(createReply));
            } catch (Exception e) {
                handleMessageProcessingException(createReply, e, "DocumentMessage");
                this.receiver.reply(createReply);
            }
        }
    }

    private boolean isFatalError(Reply reply) {
        Error error = reply.getError(0);
        switch (error.getCode()) {
            case DocumentProtocol.ERROR_WRONG_DISTRIBUTION /* 151002 */:
            case DocumentProtocol.ERROR_BUCKET_NOT_FOUND /* 151009 */:
            case DocumentProtocol.REPLY_VISITORINFO /* 200009 */:
                return false;
            default:
                return error.isFatal();
        }
    }

    private boolean shouldReportError(Reply reply) {
        switch (reply.getError(0).getCode()) {
            case DocumentProtocol.ERROR_BUCKET_NOT_FOUND /* 151009 */:
            case DocumentProtocol.ERROR_BUCKET_DELETED /* 151012 */:
                return false;
            default:
                return true;
        }
    }

    private static String getErrorMessage(Error error) {
        return DocumentProtocol.getErrorName(error.getCode()) + ": " + error.getMessage();
    }

    private static boolean isErrorOfType(Reply reply, int i) {
        return reply.getError(0).getCode() == i;
    }

    private void reportVisitorError(String str) {
        this.params.getControlHandler().onVisitorError(str);
    }

    private void handleErrorReply(Reply reply) {
        CreateVisitorMessage createVisitorMessage = (CreateVisitorMessage) reply.getMessage();
        BucketId bucketId = createVisitorMessage.getBuckets().get(0);
        BucketId bucketId2 = createVisitorMessage.getBuckets().get(1);
        this.progress.getIterator().update(bucketId, bucketId2);
        String errorMessage = getErrorMessage(reply.getError(0));
        log.log(Level.FINE, () -> {
            return this.sessionName + ": received error reply for bucket " + bucketId + " with message '" + errorMessage + "'";
        });
        if (isFatalError(reply)) {
            if (!this.params.skipBucketsOnFatalErrors()) {
                reportVisitorError(errorMessage);
                transitionTo(new StateDescription(State.FAILED, errorMessage));
                return;
            }
            markBucketProgressAsFailed(bucketId, bucketId2, errorMessage);
        }
        if (isErrorOfType(reply, DocumentProtocol.ERROR_WRONG_DISTRIBUTION)) {
            handleWrongDistributionReply((WrongDistributionReply) reply);
            return;
        }
        if (shouldReportError(reply)) {
            reportVisitorError(errorMessage);
        }
        scheduleSendCreateVisitorsIfApplicable(100L, TimeUnit.MILLISECONDS);
    }

    private void markBucketProgressAsFailed(BucketId bucketId, BucketId bucketId2, String str) {
        this.progress.getToken().addFailedBucket(bucketId, bucketId2, str);
        this.progress.getIterator().update(bucketId, ProgressToken.FINISHED_BUCKET);
    }

    private boolean enoughHitsReceived() {
        return this.params.getMaxTotalHits() != -1 && this.statistics.getDocumentsReturned() >= this.params.getMaxTotalHits();
    }

    private boolean visitingCompleted() {
        return this.pendingMessageCount == 0 && (this.progress.getIterator().isDone() || this.state.failed() || enoughHitsReceived());
    }

    private long messageTimeoutMillis() {
        if (isInfiniteTimeout(this.params.getTimeoutMs())) {
            return 300000L;
        }
        return Math.max(1L, this.params.getTimeoutMs());
    }

    private long sessionTimeoutMillis() {
        return this.params.getSessionTimeoutMs();
    }

    private long elapsedTimeMillis() {
        return TimeUnit.NANOSECONDS.toMillis(this.clock.monotonicNanoTime() - this.startTimeNanos);
    }

    private static boolean isInfiniteTimeout(long j) {
        return j < 0;
    }

    private long computeBoundedMessageTimeoutMillis(long j) {
        long messageTimeoutMillis = messageTimeoutMillis();
        return !isInfiniteTimeout(sessionTimeoutMillis()) ? Math.min(Math.max(1L, sessionTimeoutMillis() - j), messageTimeoutMillis) : messageTimeoutMillis;
    }

    private boolean scheduleSendCreateVisitorsIfApplicable(long j, TimeUnit timeUnit) {
        long elapsedTimeMillis = elapsedTimeMillis();
        if (!isInfiniteTimeout(sessionTimeoutMillis()) && elapsedTimeMillis >= sessionTimeoutMillis()) {
            transitionTo(new StateDescription(State.TIMED_OUT, String.format("Session timeout of %d ms expired", Long.valueOf(sessionTimeoutMillis()))));
        }
        if (!mayScheduleCreateVisitorsTask() || visitingCompleted()) {
            return false;
        }
        this.taskExecutor.scheduleTask(new SendCreateVisitorsTask(computeBoundedMessageTimeoutMillis(elapsedTimeMillis)), j, timeUnit);
        this.scheduledSendCreateVisitors = true;
        return true;
    }

    private boolean mayScheduleCreateVisitorsTask() {
        return (this.scheduledSendCreateVisitors || !this.progress.getIterator().hasNext() || this.state.failed() || enoughHitsReceived()) ? false : true;
    }

    private boolean scheduleSendCreateVisitorsIfApplicable() {
        return scheduleSendCreateVisitorsIfApplicable(0L, TimeUnit.MILLISECONDS);
    }

    private void handleCreateVisitorReply(CreateVisitorReply createVisitorReply) {
        BucketId bucketId = ((CreateVisitorMessage) createVisitorReply.getMessage()).getBuckets().get(0);
        BucketId lastBucket = createVisitorReply.getLastBucket();
        log.log(Level.FINE, () -> {
            return this.sessionName + ": received CreateVisitorReply for bucket " + bucketId + " with progress " + lastBucket;
        });
        this.progress.getIterator().update(bucketId, lastBucket);
        this.params.getControlHandler().onProgress(this.progress.getToken());
        this.statistics.add(createVisitorReply.getVisitorStatistics());
        this.params.getControlHandler().onVisitorStatistics(this.statistics);
        if (createVisitorReply.getTrace().getRoot().isEmpty() || this.trace.getRoot().getNumChildren() >= 1000) {
            return;
        }
        this.trace.getRoot().addChild(createVisitorReply.getTrace().getRoot());
    }

    private void handleWrongDistributionReply(WrongDistributionReply wrongDistributionReply) {
        try {
            int distributionBitCount = new ClusterState(wrongDistributionReply.getSystemState()).getDistributionBitCount();
            if (distributionBitCount != this.progress.getIterator().getDistributionBitCount()) {
                log.log(Level.FINE, () -> {
                    return "System state changed; now at " + distributionBitCount + " distribution bits";
                });
                this.progress.getIterator().setDistributionBitCount(distributionBitCount);
            }
        } catch (Exception e) {
            log.log(Level.SEVERE, "Failed to parse new system state string: " + wrongDistributionReply.getSystemState());
            transitionTo(new StateDescription(State.FAILED, "Failed to parse cluster state '" + wrongDistributionReply.getSystemState() + "'"));
        }
    }

    public String getSessionName() {
        return this.sessionName;
    }

    @Override // com.yahoo.documentapi.VisitorSession
    public boolean isDone() {
        boolean z;
        synchronized (this.progress.getToken()) {
            z = this.done;
        }
        return z;
    }

    @Override // com.yahoo.documentapi.VisitorSession
    public ProgressToken getProgress() {
        return this.progress.getToken();
    }

    @Override // com.yahoo.documentapi.VisitorSession
    public Trace getTrace() {
        return this.trace;
    }

    @Override // com.yahoo.documentapi.VisitorSession
    public boolean waitUntilDone(long j) throws InterruptedException {
        return this.params.getControlHandler().waitUntilDone(j);
    }

    @Override // com.yahoo.documentapi.VisitorControlSession
    public void ack(AckToken ackToken) {
        if (log.isLoggable(Level.FINE)) {
            log.log(Level.FINE, "Visitor session " + this.sessionName + ": Sending ack " + ackToken.ackObject);
        }
        this.receiver.reply((Reply) ackToken.ackObject);
    }

    @Override // com.yahoo.documentapi.VisitorControlSession
    public void abort() {
        synchronized (this.progress.getToken()) {
            transitionTo(new StateDescription(State.ABORTED, "Visitor aborted by user"));
        }
    }

    @Override // com.yahoo.documentapi.VisitorControlSession
    public VisitorResponse getNext() {
        if (this.params.getLocalDataHandler() == null) {
            throw new IllegalStateException("Data has been routed to external source for this visitor");
        }
        return this.params.getLocalDataHandler().getNext();
    }

    @Override // com.yahoo.documentapi.VisitorControlSession
    public VisitorResponse getNext(int i) throws InterruptedException {
        if (this.params.getLocalDataHandler() == null) {
            throw new IllegalStateException("Data has been routed to external source for this visitor");
        }
        return this.params.getLocalDataHandler().getNext(i);
    }

    public boolean isDestroying() {
        boolean z;
        synchronized (this.completionMonitor) {
            z = this.destroying;
        }
        return z;
    }

    @Override // com.yahoo.documentapi.VisitorControlSession
    public void destroy() {
        log.log(Level.FINE, () -> {
            return this.sessionName + ": synchronous destroy() called";
        });
        try {
            try {
                synchronized (this.progress.getToken()) {
                    synchronized (this.completionMonitor) {
                        if (!this.done) {
                            transitionTo(new StateDescription(State.ABORTED, "Session explicitly destroyed before completion"));
                        }
                    }
                }
                synchronized (this.completionMonitor) {
                    if (!$assertionsDisabled && this.destroying) {
                        throw new AssertionError("Attempted to destroy VisitorSession more than once");
                    }
                    this.destroying = true;
                    while (!this.done) {
                        this.completionMonitor.wait();
                    }
                }
                try {
                    this.sender.destroy();
                    this.receiver.destroy();
                } catch (Exception e) {
                    log.log(Level.SEVERE, "Caught exception destroying communication interfaces", (Throwable) e);
                }
                log.log(Level.FINE, () -> {
                    return this.sessionName + ": synchronous destroy() done";
                });
            } catch (InterruptedException e2) {
                log.log(Level.WARNING, "Interrupted waiting for visitor session to be destroyed");
                try {
                    this.sender.destroy();
                    this.receiver.destroy();
                } catch (Exception e3) {
                    log.log(Level.SEVERE, "Caught exception destroying communication interfaces", (Throwable) e3);
                }
                log.log(Level.FINE, () -> {
                    return this.sessionName + ": synchronous destroy() done";
                });
            }
        } catch (Throwable th) {
            try {
                this.sender.destroy();
                this.receiver.destroy();
            } catch (Exception e4) {
                log.log(Level.SEVERE, "Caught exception destroying communication interfaces", (Throwable) e4);
            }
            log.log(Level.FINE, () -> {
                return this.sessionName + ": synchronous destroy() done";
            });
            throw th;
        }
    }

    static {
        $assertionsDisabled = !MessageBusVisitorSession.class.desiredAssertionStatus();
        log = Logger.getLogger(MessageBusVisitorSession.class.getName());
        sessionCounter = new AtomicLong(0L);
    }
}
