/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.statetransfer;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.SmallIntSet;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateRequestCommand;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class InboundTransferTask {
    private static final Log log = LogFactory.getLog(InboundTransferTask.class);
    private static final boolean trace = log.isTraceEnabled();
    private final SmallIntSet segments;
    private final SmallIntSet finishedSegments;
    private final Address source;
    private volatile boolean isCancelled = false;
    private final CompletableFuture<Void> completionFuture = new CompletableFuture();
    private final int topologyId;
    private final RpcManager rpcManager;
    private final CommandsFactory commandsFactory;
    private final long timeout;
    private final String cacheName;
    private final RpcOptions rpcOptions;
    private final boolean applyState;

    public InboundTransferTask(Set<Integer> segments, Address source, int topologyId, RpcManager rpcManager, CommandsFactory commandsFactory, long timeout, String cacheName, boolean applyState) {
        if (segments == null || segments.isEmpty()) {
            throw new IllegalArgumentException("segments must not be null or empty");
        }
        if (source == null) {
            throw new IllegalArgumentException("Source address cannot be null");
        }
        this.segments = new SmallIntSet(segments);
        this.finishedSegments = new SmallIntSet();
        this.source = source;
        this.topologyId = topologyId;
        this.rpcManager = rpcManager;
        this.commandsFactory = commandsFactory;
        this.timeout = timeout;
        this.cacheName = cacheName;
        this.applyState = applyState;
        this.rpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS).timeout(timeout, TimeUnit.MILLISECONDS).build();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SmallIntSet getSegments() {
        SmallIntSet smallIntSet = this.segments;
        synchronized (smallIntSet) {
            return new SmallIntSet(this.segments);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public SmallIntSet getUnfinishedSegments() {
        SmallIntSet smallIntSet = this.segments;
        synchronized (smallIntSet) {
            SmallIntSet unfinishedSegments = new SmallIntSet(this.segments);
            unfinishedSegments.removeAll((Collection)this.finishedSegments);
            return unfinishedSegments;
        }
    }

    public Address getSource() {
        return this.source;
    }

    public CompletableFuture<Void> requestSegments() {
        return this.startTransfer(this.applyState ? StateRequestCommand.Type.START_STATE_TRANSFER : StateRequestCommand.Type.START_CONSISTENCY_CHECK);
    }

    public CompletableFuture<Void> requestKeys() {
        return this.startTransfer(StateRequestCommand.Type.START_KEYS_TRANSFER);
    }

    private CompletableFuture<Void> startTransfer(StateRequestCommand.Type type) {
        if (!this.isCancelled) {
            SmallIntSet segmentsCopy = this.getSegments();
            if (segmentsCopy.isEmpty()) {
                log.tracef("Segments list is empty, skipping source %s", this.source);
                this.completionFuture.complete(null);
                return this.completionFuture;
            }
            if (trace) {
                log.tracef("Requesting state (%s) from node %s for segments %s", (Object)type, this.source, segmentsCopy);
            }
            try {
                StateRequestCommand cmd = this.commandsFactory.buildStateRequestCommand(type, this.rpcManager.getAddress(), this.topologyId, (Set<Integer>)segmentsCopy);
                Map<Address, Response> responses = this.rpcManager.invokeRemotely(Collections.singleton(this.source), cmd, this.rpcOptions);
                Response response = responses.get(this.source);
                if (response instanceof SuccessfulResponse) {
                    if (trace) {
                        log.tracef("Successfully requested state (%s) from node %s for segments %s", (Object)type, this.source, segmentsCopy);
                    }
                    return this.completionFuture;
                }
                Throwable e = response instanceof ExceptionResponse ? ((ExceptionResponse)response).getException() : new CacheException(String.valueOf(response));
                log.failedToRequestSegments(this.cacheName, this.source, (Collection<Integer>)segmentsCopy, e);
                this.completionFuture.completeExceptionally(e);
            }
            catch (Exception e) {
                log.failedToRequestSegments(this.cacheName, this.source, (Collection<Integer>)segmentsCopy, e);
                this.completionFuture.completeExceptionally(e);
            }
        }
        return this.completionFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancelSegments(Set<Integer> cancelledSegments) {
        if (this.isCancelled) {
            throw new IllegalArgumentException("The task is already cancelled.");
        }
        if (trace) {
            log.tracef("Partially cancelling inbound state transfer from node %s, segments %s", this.source, cancelledSegments);
        }
        SmallIntSet smallIntSet = this.segments;
        synchronized (smallIntSet) {
            if (!this.segments.containsAll(cancelledSegments)) {
                throw new IllegalArgumentException("Some of the specified segments cannot be cancelled because they were not previously requested");
            }
            this.segments.removeAll(cancelledSegments);
            this.finishedSegments.removeAll(cancelledSegments);
            if (this.segments.isEmpty()) {
                this.isCancelled = true;
            }
        }
        this.sendCancelCommand(cancelledSegments);
        if (this.isCancelled) {
            this.notifyCompletion(false);
        }
    }

    public void cancel() {
        if (!this.isCancelled) {
            this.isCancelled = true;
            SmallIntSet segmentsCopy = this.getSegments();
            if (trace) {
                log.tracef("Cancelling inbound state transfer from %s with segments %s", this.source, segmentsCopy);
            }
            this.sendCancelCommand((Set<Integer>)segmentsCopy);
            this.notifyCompletion(false);
        }
    }

    public boolean isCancelled() {
        return this.isCancelled;
    }

    private void sendCancelCommand(Set<Integer> cancelledSegments) {
        StateRequestCommand.Type requestType = this.applyState ? StateRequestCommand.Type.CANCEL_STATE_TRANSFER : StateRequestCommand.Type.CANCEL_CONSISTENCY_CHECK;
        StateRequestCommand cmd = this.commandsFactory.buildStateRequestCommand(requestType, this.rpcManager.getAddress(), this.topologyId, cancelledSegments);
        try {
            this.rpcManager.invokeRemotely(Collections.singleton(this.source), cmd, this.rpcManager.getDefaultRpcOptions(false));
        }
        catch (Exception e) {
            log.debugf("Caught an exception while cancelling state transfer from node %s for segments %s", this.source, cancelledSegments);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onStateReceived(int segmentId, boolean isLastChunk) {
        if (!this.isCancelled && isLastChunk) {
            boolean isCompleted = false;
            SmallIntSet smallIntSet = this.segments;
            synchronized (smallIntSet) {
                if (this.segments.contains(segmentId)) {
                    this.finishedSegments.add(segmentId);
                    if (this.finishedSegments.size() == this.segments.size()) {
                        log.debugf("Finished receiving state for segments %s", this.segments);
                        isCompleted = true;
                    }
                }
            }
            if (isCompleted) {
                this.notifyCompletion(true);
            }
        }
    }

    private void notifyCompletion(boolean success) {
        if (success) {
            this.completionFuture.complete(null);
        } else {
            this.completionFuture.completeExceptionally(new CancellationException("Inbound transfer was cancelled"));
        }
    }

    public boolean isCompletedSuccessfully() {
        return this.completionFuture.isDone() && !this.completionFuture.isCompletedExceptionally();
    }

    public void terminate() {
        this.notifyCompletion(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        SmallIntSet smallIntSet = this.segments;
        synchronized (smallIntSet) {
            return "InboundTransferTask{segments=" + this.segments + ", finishedSegments=" + this.finishedSegments + ", unfinishedSegments=" + this.getUnfinishedSegments() + ", source=" + this.source + ", isCancelled=" + this.isCancelled + ", completionFuture=" + this.completionFuture + ", topologyId=" + this.topologyId + ", timeout=" + this.timeout + ", cacheName=" + this.cacheName + '}';
        }
    }
}

