/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.stream.impl.tx;

import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.infinispan.CacheStream;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.impl.LocalTxInvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.remoting.transport.Address;
import org.infinispan.stream.impl.AbstractCacheStream;
import org.infinispan.stream.impl.DistributedCacheStream;
import org.infinispan.stream.impl.DistributedDoubleCacheStream;
import org.infinispan.stream.impl.DistributedIntCacheStream;
import org.infinispan.stream.impl.DistributedLongCacheStream;
import org.infinispan.stream.impl.tx.TxClusterStreamManager;
import org.infinispan.stream.impl.tx.TxDistributedDoubleCacheStream;
import org.infinispan.stream.impl.tx.TxDistributedIntCacheStream;
import org.infinispan.stream.impl.tx.TxDistributedLongCacheStream;

public class TxDistributedCacheStream<R>
extends DistributedCacheStream<R> {
    private final Address localAddress;
    private final LocalTxInvocationContext ctx;
    private final ConsistentHash hash;

    public <K, V> TxDistributedCacheStream(Address localAddress, boolean parallel, DistributionManager dm, Supplier<CacheStream<CacheEntry<K, V>>> supplier, TxClusterStreamManager<?> csm, boolean includeLoader, int distributedBatchSize, Executor executor, ComponentRegistry registry, LocalTxInvocationContext ctx) {
        super(localAddress, parallel, dm, supplier, csm, includeLoader, distributedBatchSize, executor, registry);
        this.localAddress = localAddress;
        this.hash = dm.getWriteConsistentHash();
        this.ctx = ctx;
    }

    public <K, V> TxDistributedCacheStream(Address localAddress, boolean parallel, DistributionManager dm, Supplier<CacheStream<CacheEntry<K, V>>> supplier, TxClusterStreamManager<?> csm, boolean includeLoader, int distributedBatchSize, Executor executor, ComponentRegistry registry, Function<? super CacheEntry<K, V>, R> function, LocalTxInvocationContext ctx) {
        super(localAddress, parallel, dm, supplier, csm, includeLoader, distributedBatchSize, executor, registry, function);
        this.localAddress = localAddress;
        this.hash = dm.getWriteConsistentHash();
        this.ctx = ctx;
    }

    TxDistributedCacheStream(AbstractCacheStream other, Address localAddress, ConsistentHash hash, LocalTxInvocationContext ctx) {
        super(other);
        this.localAddress = localAddress;
        this.hash = hash;
        this.ctx = ctx;
    }

    @Override
    protected Supplier<Stream<CacheEntry>> supplierForSegments(ConsistentHash ch, Set<Integer> targetSegments, Set<Object> excludedKeys, boolean primaryOnly) {
        return () -> {
            Supplier<Stream<CacheEntry>> supplier = super.supplierForSegments(ch, targetSegments, excludedKeys, primaryOnly);
            Set set = this.ctx.getLookedUpEntries().values().stream().filter((? super T e) -> !this.isPrimaryOwner(ch, (CacheEntry)e)).collect(Collectors.toSet());
            Stream<CacheEntry> suppliedStream = supplier.get();
            if (!set.isEmpty()) {
                return Stream.concat(set.stream(), suppliedStream);
            }
            return suppliedStream;
        };
    }

    @Override
    protected DistributedDoubleCacheStream doubleCacheStream() {
        return new TxDistributedDoubleCacheStream(this, this.localAddress, this.hash, this.ctx);
    }

    @Override
    protected DistributedLongCacheStream longCacheStream() {
        return new TxDistributedLongCacheStream(this, this.localAddress, this.hash, this.ctx);
    }

    @Override
    protected DistributedIntCacheStream intCacheStream() {
        return new TxDistributedIntCacheStream(this, this.localAddress, this.hash, this.ctx);
    }
}

