/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.xsite;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import javax.transaction.TransactionManager;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.cache.impl.InvocationHelper;
import org.infinispan.commands.AbstractVisitor;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.functional.WriteOnlyManyEntriesCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.RollbackCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.IracPutKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.RemoveExpiredCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.commons.IllegalLifecycleStateException;
import org.infinispan.commons.time.TimeService;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.functional.FunctionalMap;
import org.infinispan.functional.impl.FunctionalMapImpl;
import org.infinispan.functional.impl.WriteOnlyMapImpl;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.marshall.core.MarshallableFunctions;
import org.infinispan.metadata.Metadata;
import org.infinispan.metadata.impl.IracMetadata;
import org.infinispan.metadata.impl.PrivateMetadata;
import org.infinispan.remoting.LocalInvocation;
import org.infinispan.remoting.RpcException;
import org.infinispan.remoting.responses.CacheNotFoundResponse;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.ValidResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.ResponseCollectors;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.transaction.impl.LocalTransaction;
import org.infinispan.transaction.impl.TransactionTable;
import org.infinispan.transaction.xa.GlobalTransaction;
import org.infinispan.util.ByteString;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.CompletionStages;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.xsite.BackupReceiver;
import org.infinispan.xsite.commands.XSiteStateTransferFinishReceiveCommand;
import org.infinispan.xsite.commands.XSiteStateTransferStartReceiveCommand;
import org.infinispan.xsite.irac.DiscardUpdateException;
import org.infinispan.xsite.statetransfer.XSiteState;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;

@Scope(value=Scopes.NAMED_CACHE)
public class ClusteredCacheBackupReceiver
implements BackupReceiver {
    private static final Log log = LogFactory.getLog(ClusteredCacheBackupReceiver.class);
    private static final boolean trace = log.isDebugEnabled();
    private static final BiFunction<Object, Throwable, Void> CHECK_EXCEPTION = (o, throwable) -> {
        if (throwable == null || throwable instanceof DiscardUpdateException) {
            return null;
        }
        throw CompletableFutures.asCompletionException(throwable);
    };
    @Inject
    Cache<Object, Object> cache;
    @Inject
    TimeService timeService;
    @Inject
    CommandsFactory commandsFactory;
    @Inject
    KeyPartitioner keyPartitioner;
    @Inject
    InvocationHelper invocationHelper;
    @Inject
    InvocationContextFactory invocationContextFactory;
    @Inject
    RpcManager rpcManager;
    @Inject
    ClusteringDependentLogic clusteringDependentLogic;
    private final ByteString cacheName;
    private volatile DefaultHandler defaultHandler;

    public ClusteredCacheBackupReceiver(String cacheName) {
        this.cacheName = ByteString.fromString(cacheName);
    }

    @Start
    public void start() {
        ComponentRegistry cr = this.cache.getAdvancedCache().getComponentRegistry();
        TransactionHandler txHandler = new TransactionHandler(this.cache, cr.getTransactionTable());
        this.defaultHandler = new DefaultHandler(txHandler, cr.getComponent(BlockingManager.class));
    }

    @Override
    public CompletionStage<Void> handleStartReceivingStateTransfer(XSiteStateTransferStartReceiveCommand command) {
        return this.invokeRemotelyInLocalSite(XSiteStateTransferStartReceiveCommand.copyForCache(command, this.cacheName));
    }

    @Override
    public CompletionStage<Void> handleEndReceivingStateTransfer(XSiteStateTransferFinishReceiveCommand command) {
        return this.invokeRemotelyInLocalSite(XSiteStateTransferFinishReceiveCommand.copyForCache(command, this.cacheName));
    }

    private static PrivateMetadata internalMetadata(IracMetadata metadata) {
        return new PrivateMetadata.Builder().iracMetadata(metadata).build();
    }

    @Override
    public CompletionStage<Void> handleStateTransferState(XSiteStatePushCommand cmd) {
        CompletableFuture<Void> allowInvocation = this.checkInvocationAllowedFuture();
        if (allowInvocation != null) {
            return allowInvocation;
        }
        long endTime = this.timeService.expectedEndTime(cmd.getTimeout(), TimeUnit.MILLISECONDS);
        HashMap<Address, List> primaryOwnersChunks = new HashMap<Address, List>();
        Address localAddress = this.rpcManager.getAddress();
        if (trace) {
            log.tracef("Received X-Site state transfer '%s'. Splitting by primary owner.", cmd);
        }
        for (XSiteState state : cmd.getChunk()) {
            Address primaryOwner = this.clusteringDependentLogic.getCacheTopology().getDistribution(state.key()).primary();
            List primaryOwnerList = primaryOwnersChunks.computeIfAbsent(primaryOwner, k -> new LinkedList());
            primaryOwnerList.add(state);
        }
        List localChunks = (List)primaryOwnersChunks.remove(localAddress);
        AggregateCompletionStage<Void> cf = CompletionStages.aggregateCompletionStage();
        for (Map.Entry entry : primaryOwnersChunks.entrySet()) {
            if (entry.getValue() == null || ((List)entry.getValue()).isEmpty()) continue;
            if (trace) {
                log.tracef("Node '%s' will apply %s", entry.getKey(), entry.getValue());
            }
            StatePushTask task = new StatePushTask((List)entry.getValue(), (Address)entry.getKey(), endTime);
            task.executeRemote();
            cf.dependsOn(task);
        }
        primaryOwnersChunks.clear();
        if (trace) {
            log.tracef("Local node '%s' will apply %s", localAddress, localChunks);
        }
        if (localChunks != null) {
            StatePushTask task = new StatePushTask(localChunks, localAddress, endTime);
            task.executeLocal();
            cf.dependsOn(task);
        }
        return cf.freeze().thenApply(this::assertAllowInvocationFunction);
    }

    @Override
    public final <O> CompletionStage<O> handleRemoteCommand(VisitableCommand command, boolean preserveOrder) {
        try {
            assert (!preserveOrder);
            return (CompletionStage)command.acceptVisitor(null, this.defaultHandler);
        }
        catch (Throwable throwable) {
            return CompletableFutures.completedExceptionFuture(throwable);
        }
    }

    @Override
    public CompletionStage<Void> putKeyValue(Object key, Object value, Metadata metadata, IracMetadata iracMetadata) {
        IracPutKeyValueCommand cmd = this.commandsFactory.buildIracPutKeyValueCommand(key, this.segment(key), value, metadata, ClusteredCacheBackupReceiver.internalMetadata(iracMetadata));
        InvocationContext ctx = this.invocationContextFactory.createSingleKeyNonTxInvocationContext();
        return this.invocationHelper.invokeAsync(ctx, cmd).handle(CHECK_EXCEPTION);
    }

    @Override
    public CompletionStage<Void> removeKey(Object key, IracMetadata iracMetadata) {
        IracPutKeyValueCommand cmd = this.commandsFactory.buildIracPutKeyValueCommand(key, this.segment(key), null, null, ClusteredCacheBackupReceiver.internalMetadata(iracMetadata));
        InvocationContext ctx = this.invocationContextFactory.createSingleKeyNonTxInvocationContext();
        return this.invocationHelper.invokeAsync(ctx, cmd).handle(CHECK_EXCEPTION);
    }

    private <T> CompletableFuture<T> checkInvocationAllowedFuture() {
        ComponentStatus status = this.cache.getStatus();
        if (!status.allowInvocations()) {
            return CompletableFutures.completedExceptionFuture(new IllegalLifecycleStateException("Cache is stopping or terminated: " + (Object)((Object)status)));
        }
        return null;
    }

    private Void assertAllowInvocationFunction(Object ignoredRetVal) {
        ComponentStatus status = this.cache.getStatus();
        if (!status.allowInvocations()) {
            throw CompletableFutures.asCompletionException(new IllegalLifecycleStateException("Cache is stopping or terminated: " + (Object)((Object)status)));
        }
        return null;
    }

    private XSiteStatePushCommand newStatePushCommand(List<XSiteState> stateList) {
        return this.commandsFactory.buildXSiteStatePushCommand(stateList.toArray(new XSiteState[0]), 0L);
    }

    @Override
    public CompletionStage<Void> clearKeys() {
        return this.defaultHandler.cache().clearAsync();
    }

    @Override
    public CompletionStage<Boolean> touchEntry(Object key) {
        return this.cache.getAdvancedCache().touch(key, false);
    }

    private CompletionStage<Void> invokeRemotelyInLocalSite(CacheRpcCommand command) {
        CompletionStage<Map<Address, Response>> remote = this.rpcManager.invokeCommandOnAll(command, MapResponseCollector.validOnly(), this.rpcManager.getSyncRpcOptions());
        CompletionStage<Response> local = LocalInvocation.newInstanceFromCache(this.cache, command).callAsync();
        return CompletableFuture.allOf(remote.toCompletableFuture(), local.toCompletableFuture());
    }

    private int segment(Object key) {
        return this.keyPartitioner.getSegment(key);
    }

    private class StatePushTask
    extends CompletableFuture<Void>
    implements ResponseCollector<Response>,
    BiFunction<Response, Throwable, Void> {
        private final List<XSiteState> chunk;
        private final Address address;
        private final long endTime;

        private StatePushTask(List<XSiteState> chunk, Address address, long endTime) {
            this.chunk = chunk;
            this.address = address;
            this.endTime = endTime;
        }

        @Override
        public Void apply(Response response, Throwable throwable) {
            if (throwable != null) {
                if (this.isShouldGiveUp()) {
                    return null;
                }
                if (ClusteredCacheBackupReceiver.this.rpcManager.getMembers().contains(this.address) && !ClusteredCacheBackupReceiver.this.rpcManager.getAddress().equals(this.address)) {
                    if (trace) {
                        log.tracef(throwable, "An exception was sent by %s. Retrying!", this.address);
                    }
                    this.executeRemote();
                } else {
                    if (trace) {
                        log.tracef(throwable, "An exception was sent by %s. Retrying locally!", this.address);
                    }
                    this.executeLocal();
                }
            } else if (response == CacheNotFoundResponse.INSTANCE) {
                if (trace) {
                    log.tracef("Cache not found in node '%s'. Retrying locally!", this.address);
                }
                if (this.isShouldGiveUp()) {
                    return null;
                }
                this.executeLocal();
            } else {
                this.complete(null);
            }
            return null;
        }

        @Override
        public Response addResponse(Address sender, Response response) {
            if (response instanceof ValidResponse || response instanceof CacheNotFoundResponse) {
                return response;
            }
            if (response instanceof ExceptionResponse) {
                throw ResponseCollectors.wrapRemoteException(sender, ((ExceptionResponse)response).getException());
            }
            throw ResponseCollectors.wrapRemoteException(sender, (Throwable)((Object)new RpcException("Unknown response type: " + response)));
        }

        @Override
        public Response finish() {
            return null;
        }

        private void executeRemote() {
            RpcOptions rpcOptions = ClusteredCacheBackupReceiver.this.rpcManager.getSyncRpcOptions();
            ClusteredCacheBackupReceiver.this.rpcManager.invokeCommand(this.address, (ReplicableCommand)ClusteredCacheBackupReceiver.this.newStatePushCommand(this.chunk), this, rpcOptions).handle(this);
        }

        private void executeLocal() {
            LocalInvocation.newInstanceFromCache(ClusteredCacheBackupReceiver.this.cache, ClusteredCacheBackupReceiver.this.newStatePushCommand(this.chunk)).callAsync().handle(this);
        }

        private boolean isShouldGiveUp() {
            ComponentStatus status = ClusteredCacheBackupReceiver.this.cache.getStatus();
            if (!status.allowInvocations()) {
                this.completeExceptionally(new IllegalLifecycleStateException("Cache is stopping or terminated: " + (Object)((Object)status)));
                return true;
            }
            if (ClusteredCacheBackupReceiver.this.timeService.isTimeExpired(this.endTime)) {
                this.completeExceptionally((Throwable)((Object)new TimeoutException("Unable to apply state in the time limit.")));
                return true;
            }
            return false;
        }
    }

    private static final class TransactionHandler
    extends AbstractVisitor {
        private static final Log log = LogFactory.getLog(TransactionHandler.class);
        private static final boolean trace = log.isTraceEnabled();
        private final ConcurrentMap<GlobalTransaction, GlobalTransaction> remote2localTx;
        private final AdvancedCache<Object, Object> backupCache;
        private final FunctionalMap.WriteOnlyMap<Object, Object> writeOnlyMap;
        private final TransactionTable transactionTable;

        TransactionHandler(Cache<Object, Object> backup, TransactionTable transactionTable) {
            this.backupCache = backup.getAdvancedCache().withStorageMediaType().withFlags(Flag.IGNORE_RETURN_VALUES, Flag.SKIP_XSITE_BACKUP);
            this.writeOnlyMap = WriteOnlyMapImpl.create(FunctionalMapImpl.create(this.backupCache));
            this.remote2localTx = new ConcurrentHashMap<GlobalTransaction, GlobalTransaction>();
            this.transactionTable = transactionTable;
        }

        @Override
        public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
            if (command.isConditional()) {
                throw new UnsupportedOperationException();
            }
            this.backupCache.put(command.getKey(), command.getValue(), command.getMetadata());
            return null;
        }

        @Override
        public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) {
            if (command.isConditional()) {
                throw new UnsupportedOperationException();
            }
            this.backupCache.remove(command.getKey());
            return null;
        }

        @Override
        public Object visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) {
            CompletableFuture<Void> future = this.writeOnlyMap.evalMany(command.getArguments(), MarshallableFunctions.setInternalCacheValueConsumer());
            return future.join();
        }

        void handlePrepareCommand(PrepareCommand command) {
            if (this.isTransactional()) {
                if (!command.hasModifications()) {
                    throw new IllegalStateException("TxInvocationContext has no modifications!");
                }
                try {
                    this.replayModificationsInTransaction(command, command.isOnePhaseCommit());
                }
                catch (Throwable throwable) {
                    throw CompletableFutures.asCompletionException(throwable);
                }
            }
            try {
                this.replayModifications(command);
            }
            catch (Throwable throwable) {
                throw CompletableFutures.asCompletionException(throwable);
            }
        }

        void handleCommitCommand(CommitCommand command) {
            if (!this.isTransactional()) {
                log.cannotRespondToCommit(command.getGlobalTransaction(), this.backupCache.getName());
            } else {
                if (trace) {
                    log.tracef("Committing remote transaction %s", command.getGlobalTransaction());
                }
                try {
                    this.completeTransaction(command.getGlobalTransaction(), true);
                }
                catch (Throwable throwable) {
                    throw CompletableFutures.asCompletionException(throwable);
                }
            }
        }

        void handleRollbackCommand(RollbackCommand command) {
            if (!this.isTransactional()) {
                log.cannotRespondToRollback(command.getGlobalTransaction(), this.backupCache.getName());
            } else {
                if (trace) {
                    log.tracef("Rolling back remote transaction %s", command.getGlobalTransaction());
                }
                try {
                    this.completeTransaction(command.getGlobalTransaction(), false);
                }
                catch (Throwable throwable) {
                    throw CompletableFutures.asCompletionException(throwable);
                }
            }
        }

        @Override
        protected Object handleDefault(InvocationContext ctx, VisitableCommand command) {
            throw new UnsupportedOperationException();
        }

        private boolean isTransactional() {
            return this.transactionTable != null;
        }

        private void completeTransaction(GlobalTransaction globalTransaction, boolean commit) throws Throwable {
            GlobalTransaction localTxId = (GlobalTransaction)this.remote2localTx.remove(globalTransaction);
            if (localTxId == null) {
                throw Log.XSITE.unableToFindRemoteSiteTransaction(globalTransaction);
            }
            LocalTransaction localTx = this.transactionTable.getLocalTransaction(localTxId);
            if (localTx == null) {
                throw Log.XSITE.unableToFindLocalTransactionFromRemoteSiteTransaction(globalTransaction);
            }
            TransactionManager txManager = this.txManager();
            txManager.resume(localTx.getTransaction());
            if (!localTx.isEnlisted()) {
                if (trace) {
                    log.tracef("%s isn't enlisted! Removing it manually.", localTx);
                }
                this.transactionTable.removeLocalTransaction(localTx);
            }
            if (commit) {
                txManager.commit();
            } else {
                txManager.rollback();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void replayModificationsInTransaction(PrepareCommand command, boolean onePhaseCommit) throws Throwable {
            TransactionManager tm = this.txManager();
            boolean replaySuccessful = false;
            try {
                tm.begin();
                this.replayModifications(command);
                replaySuccessful = true;
            }
            finally {
                LocalTransaction localTx = this.transactionTable.getLocalTransaction(tm.getTransaction());
                if (localTx != null) {
                    localTx.setFromRemoteSite(true);
                    if (onePhaseCommit) {
                        if (replaySuccessful) {
                            if (trace) {
                                log.tracef("Committing remotely originated tx %s as it is 1PC", command.getGlobalTransaction());
                            }
                            tm.commit();
                        } else {
                            if (trace) {
                                log.tracef("Rolling back remotely originated tx %s", command.getGlobalTransaction());
                            }
                            tm.rollback();
                        }
                    } else {
                        this.remote2localTx.put(command.getGlobalTransaction(), localTx.getGlobalTransaction());
                        tm.suspend();
                    }
                }
            }
        }

        private TransactionManager txManager() {
            return this.backupCache.getTransactionManager();
        }

        private void replayModifications(PrepareCommand command) throws Throwable {
            for (WriteCommand c : command.getModifications()) {
                c.acceptVisitor(null, this);
            }
        }
    }

    private static class DefaultHandler
    extends AbstractVisitor {
        final TransactionHandler txHandler;
        final BlockingManager blockingManager;

        private DefaultHandler(TransactionHandler txHandler, BlockingManager blockingManager) {
            this.txHandler = txHandler;
            this.blockingManager = blockingManager;
        }

        @Override
        public CompletionStage<Object> visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) {
            return this.cache().putAsync(command.getKey(), command.getValue(), command.getMetadata());
        }

        @Override
        public CompletionStage<Object> visitRemoveCommand(InvocationContext ctx, RemoveCommand command) {
            return this.cache().removeAsync(command.getKey());
        }

        @Override
        public Object visitRemoveExpiredCommand(InvocationContext ctx, RemoveExpiredCommand command) {
            if (!command.isMaxIdle()) {
                throw new UnsupportedOperationException("Lifespan based expiration is not supported for xsite");
            }
            return this.cache().removeMaxIdleExpired(command.getKey(), command.getValue());
        }

        @Override
        public CompletionStage<Void> visitWriteOnlyManyEntriesCommand(InvocationContext ctx, WriteOnlyManyEntriesCommand command) {
            return this.fMap().evalMany(command.getArguments(), MarshallableFunctions.setInternalCacheValueConsumer());
        }

        @Override
        public final CompletionStage<Void> visitClearCommand(InvocationContext ctx, ClearCommand command) {
            return this.cache().clearAsync();
        }

        @Override
        public CompletionStage<Void> visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) {
            return this.blockingManager.runBlocking(() -> this.txHandler.handlePrepareCommand(command), command.getCommandId());
        }

        @Override
        public CompletionStage<Void> visitCommitCommand(TxInvocationContext ctx, CommitCommand command) {
            return this.blockingManager.runBlocking(() -> this.txHandler.handleCommitCommand(command), command.getCommandId());
        }

        @Override
        public CompletionStage<Void> visitRollbackCommand(TxInvocationContext ctx, RollbackCommand command) {
            return this.blockingManager.runBlocking(() -> this.txHandler.handleRollbackCommand(command), command.getCommandId());
        }

        @Override
        protected final Object handleDefault(InvocationContext ctx, VisitableCommand command) {
            throw new UnsupportedOperationException();
        }

        private AdvancedCache<Object, Object> cache() {
            return this.txHandler.backupCache;
        }

        private FunctionalMap.WriteOnlyMap<Object, Object> fMap() {
            return this.txHandler.writeOnlyMap;
        }
    }
}

