/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite.irac;

import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.PrimitiveIterator;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.irac.IracCleanupKeyCommand;
import org.infinispan.commands.irac.IracRequestStateCommand;
import org.infinispan.commands.irac.IracStateResponseCommand;
import org.infinispan.commands.irac.IracTouchKeyCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.versioning.irac.IracVersionGenerator;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.XSiteResponse;
import org.infinispan.topology.CacheTopology;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ExponentialBackOff;
import org.infinispan.util.ExponentialBackOffImpl;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.infinispan.xsite.irac.IracManager;
import org.infinispan.xsite.status.DefaultTakeOfflineManager;
import org.infinispan.xsite.status.SiteState;
import org.infinispan.xsite.status.TakeOfflineManager;

@Scope(value=Scopes.NAMED_CACHE)
public class DefaultIracManager
implements IracManager,
Runnable {
    private static final Log log = LogFactory.getLog(DefaultIracManager.class);
    private static final boolean trace = log.isTraceEnabled();
    @Inject
    RpcManager rpcManager;
    @Inject
    Configuration config;
    @Inject
    TakeOfflineManager takeOfflineManager;
    @Inject
    ClusteringDependentLogic clusteringDependentLogic;
    @Inject
    CommandsFactory commandsFactory;
    @Inject
    IracVersionGenerator iracVersionGenerator;
    private final Map<Object, Object> updatedKeys = new ConcurrentHashMap<Object, Object>();
    private final Semaphore senderNotifier = new Semaphore(0);
    private volatile ExponentialBackOff backOff = new ExponentialBackOffImpl();
    private volatile boolean hasClear;
    private volatile Collection<XSiteBackup> asyncBackups;
    private volatile Thread sender;
    private volatile boolean running;

    private static Collection<XSiteBackup> asyncBackups(Configuration config, String localSiteName) {
        return config.sites().asyncBackupsStream().filter(bc -> !localSiteName.equals(bc.site())).map(bc -> new XSiteBackup(bc.site(), true, bc.replicationTimeout())).collect(Collectors.toList());
    }

    private static Stream<?> keyStream(WriteCommand command) {
        return command.getAffectedKeys().stream();
    }

    private static boolean backupToRemoteSite(WriteCommand command) {
        return !command.hasAnyFlag(FlagBitSets.SKIP_XSITE_BACKUP);
    }

    private static IntSet newIntSet(Address ignored) {
        return IntSets.mutableEmptySet();
    }

    @Start
    public void start() {
        Thread newSender;
        Thread oldSender;
        Transport transport = this.rpcManager.getTransport();
        transport.checkCrossSiteAvailable();
        String localSiteName = transport.localSiteName();
        this.asyncBackups = DefaultIracManager.asyncBackups(this.config, localSiteName);
        if (trace) {
            String b = this.asyncBackups.stream().map(XSiteBackup::getSiteName).collect(Collectors.joining(", "));
            log.tracef("Async remote sites found: %s", b);
        }
        if ((oldSender = this.sender) != null) {
            oldSender.interrupt();
        }
        this.senderNotifier.drainPermits();
        this.running = true;
        this.hasClear = false;
        this.sender = newSender = new Thread((Runnable)this, "irac-sender-thread-" + transport.getAddress());
        newSender.start();
    }

    @Stop
    public void stop() {
        this.running = false;
        Thread oldSender = this.sender;
        if (oldSender != null) {
            oldSender.interrupt();
        }
    }

    @Override
    public void trackUpdatedKey(Object key, Object lockOwner) {
        if (trace) {
            log.tracef("Tracking key for %s: %s", lockOwner, key);
        }
        this.updatedKeys.put(key, lockOwner);
        this.senderNotifier.release();
    }

    @Override
    public <K> void trackUpdatedKeys(Collection<K> keys, Object lockOwner) {
        if (trace) {
            log.tracef("Tracking keys for %s: %s", lockOwner, keys);
        }
        if (keys.isEmpty()) {
            return;
        }
        keys.forEach(key -> this.updatedKeys.put(key, lockOwner));
        this.senderNotifier.release();
    }

    @Override
    public void trackKeysFromTransaction(Stream<WriteCommand> modifications, GlobalTransaction lockOwner) {
        this.keysFromMods(modifications).forEach(key -> {
            if (trace) {
                log.tracef("Tracking key for %s: %s", lockOwner, key);
            }
            this.updatedKeys.put(key, lockOwner);
        });
        this.senderNotifier.release();
    }

    @Override
    public void trackClear() {
        if (trace) {
            log.trace("Tracking clear request");
        }
        this.hasClear = true;
        this.updatedKeys.clear();
        this.senderNotifier.release();
    }

    @Override
    public void cleanupKey(Object key, Object lockOwner, IracMetadata tombstone) {
        boolean removed = this.updatedKeys.remove(key, lockOwner);
        this.iracVersionGenerator.removeTombstone(key, tombstone);
        if (trace) {
            log.tracef("Removing key '%s'. LockOwner='%s', removed=%s", key, lockOwner, removed);
        }
    }

    @Override
    public void onTopologyUpdate(CacheTopology oldCacheTopology, CacheTopology newCacheTopology) {
        if (trace) {
            log.trace("[IRAC] Topology Updated. Checking pending keys.");
        }
        Address local = this.rpcManager.getAddress();
        if (!newCacheTopology.getMembers().contains(local)) {
            return;
        }
        IntSet addedSegments = IntSets.mutableCopyFrom(newCacheTopology.getWriteConsistentHash().getSegmentsForOwner(local));
        if (oldCacheTopology.getMembers().contains(local)) {
            addedSegments.removeAll(oldCacheTopology.getWriteConsistentHash().getSegmentsForOwner(local));
        }
        if (addedSegments.isEmpty()) {
            this.senderNotifier.release();
            return;
        }
        HashMap<Address, IntSet> primarySegments = new HashMap<Address, IntSet>();
        PrimitiveIterator.OfInt ofInt = addedSegments.iterator();
        while (ofInt.hasNext()) {
            int segment = (Integer)ofInt.next();
            Address primary = newCacheTopology.getWriteConsistentHash().locatePrimaryOwnerForSegment(segment);
            primarySegments.computeIfAbsent(primary, DefaultIracManager::newIntSet).add(segment);
        }
        primarySegments.forEach(this::sendStateRequest);
        this.senderNotifier.release();
    }

    @Override
    public void requestState(Address origin, IntSet segments) {
        this.updatedKeys.forEach((key, lockOwner) -> this.sendStateIfNeeded(origin, segments, key, lockOwner));
    }

    @Override
    public void receiveState(Object key, Object lockOwner, IracMetadata tombstone) {
        this.iracVersionGenerator.storeTombstoneIfAbsent(key, tombstone);
        this.updatedKeys.putIfAbsent(key, lockOwner);
        this.senderNotifier.release();
    }

    @Override
    public CompletionStage<Boolean> checkAndTrackExpiration(Object key) {
        if (trace) {
            log.tracef("Checking remote backup sites to see if key %s has been touched recently", key);
        }
        IracTouchKeyCommand command = this.commandsFactory.buildIracTouchCommand(key);
        AtomicBoolean expired = new AtomicBoolean(true);
        AggregateCompletionStage<AtomicBoolean> collector = CompletionStages.aggregateCompletionStage(expired);
        for (XSiteBackup backup : this.asyncBackups) {
            if (this.takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) {
                if (!trace) continue;
                log.tracef("Skipping %s as it is offline", backup.getSiteName());
                continue;
            }
            if (trace) {
                log.tracef("Sending irac touch key command to %s", backup);
            }
            XSiteResponse<Boolean> response = this.sendToRemoteSite(backup, command);
            collector.dependsOn(response.thenAccept(touched -> {
                if (touched.booleanValue()) {
                    if (trace) {
                        log.tracef("Key %s was recently touched on a remote site %s", key, backup);
                    }
                    expired.set(false);
                } else if (trace) {
                    log.tracef("Entry %s was expired on remote site %s", key, backup);
                }
            }));
        }
        return collector.freeze().thenApply(AtomicBoolean::get);
    }

    public void sendStateIfNeeded(Address origin, IntSet segments, Object key, Object lockOwner) {
        int segment = this.getSegment(key);
        if (!segments.contains(segment)) {
            return;
        }
        IracMetadata tombstone = this.iracVersionGenerator.getTombstone(key);
        IracStateResponseCommand cmd = this.commandsFactory.buildIracStateResponseCommand(key, lockOwner, tombstone);
        this.rpcManager.sendTo(origin, cmd, DeliverOrder.NONE);
    }

    public Stream<?> keysFromMods(Stream<WriteCommand> modifications) {
        return modifications.filter(WriteCommand::isSuccessful).filter(DefaultIracManager::backupToRemoteSite).flatMap(DefaultIracManager::keyStream).filter(this::isWriteOwner);
    }

    @Override
    public void run() {
        try {
            while (this.running) {
                this.senderNotifier.acquire();
                this.senderNotifier.drainPermits();
                this.periodicSend();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void setBackOff(ExponentialBackOff backOff) {
        this.backOff = Objects.requireNonNull(backOff);
    }

    public boolean isEmpty() {
        return this.updatedKeys.isEmpty();
    }

    private void sendStateRequest(Address primary, IntSet segments) {
        IracRequestStateCommand cmd = this.commandsFactory.buildIracRequestStateCommand(segments);
        this.rpcManager.sendTo(primary, cmd, DeliverOrder.NONE);
    }

    private ResponseResult awaitResponses(CompletionStage<Void> reply) throws InterruptedException {
        try {
            reply.toCompletableFuture().get();
            return ResponseResult.OK;
        }
        catch (ExecutionException e) {
            if (trace) {
                log.trace("IRAC update not successful.", e);
            }
            this.senderNotifier.release();
            return DefaultTakeOfflineManager.isCommunicationError(e) ? ResponseResult.NETWORK_EXCEPTION : ResponseResult.REMOTE_EXCEPTION;
        }
    }

    private void periodicSend() throws InterruptedException {
        if (trace) {
            log.tracef("[IRAC] Sending keys to remote site(s). Has clear? %s, keys: %s", this.hasClear, this.updatedKeys.keySet());
        }
        if (this.hasClear) {
            CompletionStage<Void> rsp = this.sendCommandToAllBackups(this.commandsFactory.buildIracClearKeysCommand());
            switch (this.awaitResponses(rsp)) {
                case REMOTE_EXCEPTION: {
                    this.backOff.reset();
                    return;
                }
                case NETWORK_EXCEPTION: {
                    this.backOff.backoffSleep();
                    return;
                }
                case OK: {
                    this.hasClear = false;
                    this.backOff.reset();
                }
            }
        }
        try {
            SendKeyTask task = new SendKeyTask();
            this.updatedKeys.forEach(task);
            task.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        }
        catch (Throwable t) {
            log.unexpectedErrorFromIrac(t);
        }
    }

    private <O> XSiteResponse<O> sendToRemoteSite(XSiteBackup backup, XSiteReplicateCommand<O> cmd) {
        XSiteResponse<O> rsp = this.rpcManager.invokeXSite(backup, cmd);
        this.takeOfflineManager.registerRequest(rsp);
        return rsp;
    }

    private void removeKey(Object key, int segmentId, Object lockOwner, IracMetadata tombstone) {
        if (trace) {
            log.tracef("Replication completed for key '%s'. Lock Owner='%s'", key, lockOwner);
        }
        DistributionInfo dInfo = this.getDistributionInfo(segmentId);
        IracCleanupKeyCommand cmd = this.commandsFactory.buildIracCleanupKeyCommand(key, lockOwner, tombstone);
        this.rpcManager.sendToMany(dInfo.writeOwners(), cmd, DeliverOrder.NONE);
        this.cleanupKey(key, lockOwner, tombstone);
    }

    private DistributionInfo getDistributionInfoKey(Object key) {
        return this.clusteringDependentLogic.getCacheTopology().getDistribution(key);
    }

    private DistributionInfo getDistributionInfo(int segmentId) {
        return this.clusteringDependentLogic.getCacheTopology().getDistribution(segmentId);
    }

    private int getSegment(Object key) {
        return this.clusteringDependentLogic.getCacheTopology().getSegment(key);
    }

    private boolean isWriteOwner(Object key) {
        return this.getDistributionInfoKey(key).isWriteOwner();
    }

    private CompletionStage<Void> sendCommandToAllBackups(XSiteReplicateCommand<Void> command) {
        if (command == null) {
            return CompletableFutures.completedNull();
        }
        AggregateCompletionStage<Void> collector = CompletionStages.aggregateCompletionStage();
        for (XSiteBackup backup : this.asyncBackups) {
            if (this.takeOfflineManager.getSiteState(backup.getSiteName()) == SiteState.OFFLINE) continue;
            collector.dependsOn(this.sendToRemoteSite(backup, command));
        }
        return collector.freeze();
    }

    private XSiteReplicateCommand<Void> buildRemoveCommand(CleanupTask cleanupTask) {
        Object key = cleanupTask.key;
        IracMetadata metadata = this.iracVersionGenerator.getTombstone(key);
        if (metadata == null) {
            return null;
        }
        cleanupTask.tombstone = metadata;
        return this.commandsFactory.buildIracRemoveKeyCommand(key, metadata);
    }

    private CompletionStage<InternalCacheEntry<Object, Object>> fetchEntry(Object key, int segmentId) {
        return this.clusteringDependentLogic.getEntryLoader().loadAndStoreInDataContainer(key, segmentId);
    }

    private static enum ResponseResult {
        OK,
        REMOTE_EXCEPTION,
        NETWORK_EXCEPTION;

    }

    private class CleanupTask
    implements Runnable {
        final Object key;
        final int segmentId;
        final Object lockOwner;
        volatile IracMetadata tombstone;

        private CleanupTask(Object key, int segmentId, Object lockOwner) {
            this.key = key;
            this.segmentId = segmentId;
            this.lockOwner = lockOwner;
        }

        @Override
        public void run() {
            DefaultIracManager.this.removeKey(this.key, this.segmentId, this.lockOwner, this.tombstone);
        }
    }

    private class SendKeyTask
    implements BiConsumer<Object, Object> {
        private final List<CompletionStage<Void>> responses = new LinkedList<CompletionStage<Void>>();
        private final List<CleanupTask> cleanupTasks = new LinkedList<CleanupTask>();

        private SendKeyTask() {
        }

        @Override
        public void accept(Object key, Object lockOwner) {
            DistributionInfo dInfo = DefaultIracManager.this.getDistributionInfoKey(key);
            if (!dInfo.isPrimary()) {
                return;
            }
            if (!dInfo.isWriteOwner()) {
                this.cleanupTasks.add(new CleanupTask(key, dInfo.segmentId(), lockOwner));
                return;
            }
            if (!dInfo.isReadOwner()) {
                return;
            }
            CleanupTask cleanupTask = new CleanupTask(key, dInfo.segmentId(), lockOwner);
            CompletionStage<Void> rsp = DefaultIracManager.this.fetchEntry(key, dInfo.segmentId()).thenApply(lEntry -> lEntry == null ? DefaultIracManager.this.buildRemoveCommand(cleanupTask) : DefaultIracManager.this.commandsFactory.buildIracPutKeyCommand(lEntry)).thenCompose(x$0 -> DefaultIracManager.this.sendCommandToAllBackups(x$0)).thenRun(cleanupTask);
            this.responses.add(rsp);
        }

        void await() throws InterruptedException {
            this.cleanupTasks.forEach(CleanupTask::run);
            boolean needsBackoff = false;
            for (CompletionStage<Void> rsp : this.responses) {
                if (DefaultIracManager.this.awaitResponses(rsp) != ResponseResult.NETWORK_EXCEPTION) continue;
                needsBackoff = true;
            }
            if (needsBackoff) {
                DefaultIracManager.this.backOff.backoffSleep();
            } else {
                DefaultIracManager.this.backOff.reset();
            }
        }
    }
}

