package org.infinispan.notifications.cachelistener;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import org.infinispan.cache.impl.EncoderCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commons.marshall.StreamingMarshaller;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfiguration;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.entries.TransientMortalCacheEntry;
import org.infinispan.container.impl.InternalEntryFactoryImpl;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.NonTxInvocationContext;
import org.infinispan.distribution.ch.KeyPartitioner;
import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.impl.BasicComponentRegistry;
import org.infinispan.factories.impl.MockBasicComponentRegistry;
import org.infinispan.interceptors.locking.ClusteringDependentLogic;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.cluster.ClusterEventManager;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.filter.CacheEventConverter;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.notifications.cachelistener.filter.EventType;
import org.infinispan.reactive.publisher.impl.ClusterPublisherManager;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.Notifications;
import org.infinispan.reactive.publisher.impl.SegmentPublisherSupplier;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.concurrent.BlockingManager;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"unit"}, testName = "notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest")
/* loaded from: input_file:org/infinispan/notifications/cachelistener/BaseCacheNotifierImplInitialTransferTest.class */
public abstract class BaseCacheNotifierImplInitialTransferTest extends AbstractInfinispanTest {
    CacheNotifierImpl n;
    EncoderCache mockCache;
    InvocationContext ctx;
    ClusterPublisherManager mockPublisherManager;
    protected CacheMode cacheMode;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/BaseCacheNotifierImplInitialTransferTest$Operation.class */
    public enum Operation {
        PUT(Event.Type.CACHE_ENTRY_MODIFIED) { // from class: org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.Operation.1
            @Override // org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.Operation
            public void raiseEvent(CacheNotifier cacheNotifier, Object obj, Object obj2, Object obj3, InvocationContext invocationContext) {
                cacheNotifier.notifyCacheEntryModified(obj, obj3, (Metadata) null, obj2, (Metadata) null, true, invocationContext, (FlagAffectedCommand) null);
                cacheNotifier.notifyCacheEntryModified(obj, obj3, (Metadata) null, obj2, (Metadata) null, false, invocationContext, (FlagAffectedCommand) null);
            }
        },
        REMOVE(Event.Type.CACHE_ENTRY_REMOVED) { // from class: org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.Operation.2
            @Override // org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.Operation
            public void raiseEvent(CacheNotifier cacheNotifier, Object obj, Object obj2, Object obj3, InvocationContext invocationContext) {
                cacheNotifier.notifyCacheEntryRemoved(obj, obj2, (Metadata) null, true, invocationContext, (FlagAffectedCommand) null);
                cacheNotifier.notifyCacheEntryRemoved(obj, obj2, (Metadata) null, false, invocationContext, (FlagAffectedCommand) null);
            }
        },
        CREATE(Event.Type.CACHE_ENTRY_CREATED) { // from class: org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.Operation.3
            @Override // org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.Operation
            public void raiseEvent(CacheNotifier cacheNotifier, Object obj, Object obj2, Object obj3, InvocationContext invocationContext) {
                cacheNotifier.notifyCacheEntryCreated(obj, obj3, (Metadata) null, true, invocationContext, (FlagAffectedCommand) null);
                cacheNotifier.notifyCacheEntryCreated(obj, obj3, (Metadata) null, false, invocationContext, (FlagAffectedCommand) null);
            }
        };

        private final Event.Type type;

        Operation(Event.Type type) {
            this.type = type;
        }

        public Event.Type getType() {
            return this.type;
        }

        public abstract void raiseEvent(CacheNotifier cacheNotifier, Object obj, Object obj2, Object obj3, InvocationContext invocationContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/BaseCacheNotifierImplInitialTransferTest$StateListener.class */
    public static abstract class StateListener<K, V> {
        final List<CacheEntryEvent<K, V>> events = new ArrayList();
        private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());

        protected StateListener() {
        }

        @CacheEntryCreated
        @CacheEntryModified
        @CacheEntryRemoved
        public synchronized void onCacheNotification(CacheEntryEvent<K, V> cacheEntryEvent) {
            log.tracef("Received event: %s", cacheEntryEvent);
            this.events.add(cacheEntryEvent);
        }
    }

    @Listener(includeCurrentState = true, clustered = true)
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/BaseCacheNotifierImplInitialTransferTest$StateListenerClustered.class */
    private static class StateListenerClustered extends StateListener {
        private StateListenerClustered() {
        }
    }

    @Listener(includeCurrentState = true, clustered = false)
    /* loaded from: input_file:org/infinispan/notifications/cachelistener/BaseCacheNotifierImplInitialTransferTest$StateListenerNotClustered.class */
    private static class StateListenerNotClustered extends StateListener {
        private StateListenerNotClustered() {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BaseCacheNotifierImplInitialTransferTest(CacheMode cacheMode) {
        if (cacheMode.isDistributed()) {
            throw new IllegalArgumentException("This test only works with non distributed cache modes");
        }
        this.cacheMode = cacheMode;
    }

    @BeforeMethod
    public void setUp() {
        this.n = new CacheNotifierImpl();
        this.mockCache = (EncoderCache) Mockito.mock(EncoderCache.class);
        Mockito.when(this.mockCache.getCacheManager()).thenReturn((EmbeddedCacheManager) Mockito.mock(EmbeddedCacheManager.class));
        Mockito.when(this.mockCache.getAdvancedCache()).thenReturn(this.mockCache);
        Mockito.when(this.mockCache.getKeyDataConversion()).thenReturn(DataConversion.IDENTITY_KEY);
        Mockito.when(this.mockCache.getValueDataConversion()).thenReturn(DataConversion.IDENTITY_VALUE);
        Configuration build = new ConfigurationBuilder().clustering().cacheMode(this.cacheMode).build();
        GlobalConfiguration build2 = GlobalConfigurationBuilder.defaultClusteredBuilder().build();
        Mockito.when(this.mockCache.getStatus()).thenReturn(ComponentStatus.INITIALIZING);
        this.mockPublisherManager = (ClusterPublisherManager) Mockito.mock(ClusterPublisherManager.class);
        ComponentRegistry componentRegistry = (ComponentRegistry) Mockito.mock(ComponentRegistry.class);
        Mockito.when(this.mockCache.getComponentRegistry()).thenReturn(componentRegistry);
        MockBasicComponentRegistry mockBasicComponentRegistry = new MockBasicComponentRegistry();
        Mockito.when((BasicComponentRegistry) componentRegistry.getComponent(BasicComponentRegistry.class)).thenReturn(mockBasicComponentRegistry);
        mockBasicComponentRegistry.registerMocks(RpcManager.class, CommandsFactory.class);
        mockBasicComponentRegistry.registerMock("org.infinispan.marshaller.internal", StreamingMarshaller.class);
        ClusteringDependentLogic.LocalLogic localLogic = new ClusteringDependentLogic.LocalLogic();
        localLogic.init((Transport) null, build, (KeyPartitioner) Mockito.mock(KeyPartitioner.class));
        ClusterEventManager clusterEventManager = (ClusterEventManager) Mockito.mock(ClusterEventManager.class);
        Mockito.when(clusterEventManager.sendEvents(ArgumentMatchers.any())).thenReturn(CompletableFutures.completedNull());
        BlockingManager blockingManager = (BlockingManager) Mockito.mock(BlockingManager.class);
        Mockito.when(blockingManager.continueOnNonBlockingThread((CompletionStage) ArgumentMatchers.any(), ArgumentMatchers.any())).thenReturn(CompletableFutures.completedNull());
        TestingUtil.inject(this.n, this.mockCache, localLogic, build, build2, mockBasicComponentRegistry, this.mockPublisherManager, new InternalEntryFactoryImpl(), clusterEventManager, Mockito.mock(KeyPartitioner.class), blockingManager, TestingUtil.named("org.infinispan.executors.notification", new WithinThreadExecutor()));
        this.n.start();
        this.ctx = new NonTxInvocationContext((Address) null);
    }

    public void testSimpleCacheStartingClusterListener() {
        testSimpleCacheStarting(new StateListenerClustered());
    }

    private void testSimpleCacheStarting(StateListener<String, String> stateListener) {
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ImmortalCacheEntry("key-" + i, "value-" + i));
        }
        Mockito.when(this.mockPublisherManager.entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any())).thenReturn(wrapCompletionPublisher(Flowable.fromIterable(arrayList)));
        this.n.addListener(stateListener);
        verifyEvents(isClustered(stateListener), stateListener, arrayList);
    }

    public void testFilterConverterUnusedDuringIteration() {
        testFilterConverterUnusedDuringIteration(new StateListenerClustered());
    }

    private void testFilterConverterUnusedDuringIteration(StateListener<String, String> stateListener) {
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ImmortalCacheEntry("key-" + i, "value-" + i));
        }
        Mockito.when(this.mockPublisherManager.entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any())).thenReturn(wrapCompletionPublisher(Flowable.fromIterable(arrayList)));
        CacheEventFilter cacheEventFilter = (CacheEventFilter) Mockito.mock(CacheEventFilter.class, Mockito.withSettings().serializable());
        CacheEventConverter cacheEventConverter = (CacheEventConverter) Mockito.mock(CacheEventConverter.class, Mockito.withSettings().serializable());
        this.n.addListener(stateListener, cacheEventFilter, cacheEventConverter);
        verifyEvents(isClustered(stateListener), stateListener, arrayList);
        ((CacheEventFilter) Mockito.verify(cacheEventFilter, Mockito.never())).accept(ArgumentMatchers.any(), ArgumentMatchers.any(), (Metadata) ArgumentMatchers.any(Metadata.class), ArgumentMatchers.any(), (Metadata) ArgumentMatchers.any(Metadata.class), (EventType) ArgumentMatchers.any(EventType.class));
        ((CacheEventConverter) Mockito.verify(cacheEventConverter, Mockito.never())).convert(ArgumentMatchers.any(), ArgumentMatchers.any(), (Metadata) ArgumentMatchers.any(Metadata.class), ArgumentMatchers.any(), (Metadata) ArgumentMatchers.any(Metadata.class), (EventType) ArgumentMatchers.any(EventType.class));
    }

    public void testMetadataAvailable() {
        ArrayList arrayList = new ArrayList(10);
        for (int i = 0; i < 10; i++) {
            arrayList.add(new TransientMortalCacheEntry("key-" + i, "value-" + i, i, -1L, System.currentTimeMillis()));
        }
        Mockito.when(this.mockPublisherManager.entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any())).thenReturn(wrapCompletionPublisher(Flowable.fromIterable(arrayList)));
        CacheEventFilter cacheEventFilter = (CacheEventFilter) Mockito.mock(CacheEventFilter.class, Mockito.withSettings().serializable());
        CacheEventConverter cacheEventConverter = (CacheEventConverter) Mockito.mock(CacheEventConverter.class, Mockito.withSettings().serializable());
        StateListenerClustered stateListenerClustered = new StateListenerClustered();
        this.n.addListener(stateListenerClustered, cacheEventFilter, cacheEventConverter);
        verifyEvents(isClustered(stateListenerClustered), stateListenerClustered, arrayList);
        Iterator it = stateListenerClustered.events.iterator();
        while (it.hasNext()) {
            CacheEntryEvent cacheEntryEvent = (CacheEntryEvent) it.next();
            String str = (String) cacheEntryEvent.getKey();
            Metadata metadata = cacheEntryEvent.getMetadata();
            Assert.assertNotNull(metadata);
            Assert.assertEquals(metadata.lifespan(), -1L);
            Assert.assertEquals(metadata.maxIdle(), Long.parseLong(str.substring(4)));
        }
    }

    private void verifyEvents(boolean z, StateListener<String, String> stateListener, List<CacheEntry<String, String>> list) {
        boolean z2;
        int i;
        Assert.assertEquals(stateListener.events.size(), z ? list.size() : list.size() * 2);
        int i2 = 0;
        for (CacheEntryEvent<String, String> cacheEntryEvent : stateListener.events) {
            if (z) {
                z2 = true;
                i = i2;
            } else {
                z2 = (i2 & 1) == 1;
                i = i2 / 2;
            }
            Assert.assertEquals(cacheEntryEvent.getType(), Event.Type.CACHE_ENTRY_CREATED);
            Assert.assertEquals((String) cacheEntryEvent.getKey(), (String) list.get(i).getKey());
            Assert.assertEquals(cacheEntryEvent.isPre(), !z2);
            if (z2) {
                Assert.assertEquals((String) cacheEntryEvent.getValue(), (String) list.get(i).getValue());
            } else {
                Assert.assertNull(cacheEntryEvent.getValue());
            }
            i2++;
        }
    }

    public void testCreateAfterIterationBeganButNotIteratedValueYetClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE);
    }

    public void testRemoveAfterIterationBeganButNotIteratedValueYetClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE);
    }

    public void testModificationAfterIterationBeganButNotIteratedValueYetClustered() throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
        testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT);
    }

    private void testModificationAfterIterationBeganButNotIteratedValueYet(StateListener<String, String> stateListener, Operation operation) throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ImmortalCacheEntry("key-" + i, "value-" + i));
        }
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Mockito.when(this.mockPublisherManager.entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any())).thenReturn(wrapCompletionPublisher(Flowable.defer(() -> {
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            return Flowable.fromIterable(arrayList);
        })));
        Future fork = fork(() -> {
            this.n.addListener(stateListener);
            return null;
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        log.debugf("Synced with publisher, performing operation", new Object[0]);
        String str = (String) arrayList.get(3).getValue();
        switch (operation) {
            case REMOVE:
                this.n.notifyCacheEntryRemoved("key-3", str, (Metadata) null, true, this.ctx, (FlagAffectedCommand) null);
                this.n.notifyCacheEntryRemoved("key-3", str, (Metadata) null, false, this.ctx, (FlagAffectedCommand) null);
                arrayList.remove(3);
                break;
            case CREATE:
                this.n.notifyCacheEntryCreated("new-key", "new-value", (Metadata) null, true, this.ctx, (FlagAffectedCommand) null);
                this.n.notifyCacheEntryCreated("new-key", "new-value", (Metadata) null, false, this.ctx, (FlagAffectedCommand) null);
                arrayList.add(new ImmortalCacheEntry("new-key", "new-value"));
                break;
            case PUT:
                this.n.notifyCacheEntryModified("key-3", "value-3-changed", (Metadata) null, str, (Metadata) null, true, this.ctx, (FlagAffectedCommand) null);
                this.n.notifyCacheEntryModified("key-3", "value-3-changed", (Metadata) null, str, (Metadata) null, false, this.ctx, (FlagAffectedCommand) null);
                arrayList.remove(3);
                arrayList.add(3, new ImmortalCacheEntry("key-3", "value-3-changed"));
                break;
            default:
                throw new IllegalArgumentException("Unsupported Operation provided " + operation);
        }
        log.debugf("Operation done, let the iteration complete", new Object[0]);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        fork.get(10L, TimeUnit.MINUTES);
        verifyEvents(isClustered(stateListener), stateListener, arrayList);
    }

    public void testCreateAfterIterationBeganAndIteratedValueClustered() throws Exception {
        testModificationAfterIterationBeganAndIteratedValue(new StateListenerClustered(), Operation.CREATE);
    }

    public void testRemoveAfterIterationBeganAndIteratedValueClustered() throws Exception {
        testModificationAfterIterationBeganAndIteratedValue(new StateListenerClustered(), Operation.REMOVE);
    }

    public void testModificationAfterIterationBeganAndIteratedValueClustered() throws Exception {
        testModificationAfterIterationBeganAndIteratedValue(new StateListenerClustered(), Operation.PUT);
    }

    private void testModificationAfterIterationBeganAndIteratedValue(StateListener<String, String> stateListener, Operation operation) throws Exception {
        String str;
        String str2;
        String str3;
        ArrayList<CacheEntry> arrayList = new ArrayList();
        for (int i = 0; i < 10; i++) {
            arrayList.add(new ImmortalCacheEntry("key-" + i, "value-" + i));
        }
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Mockito.when(this.mockPublisherManager.entryPublisher((IntSet) ArgumentMatchers.any(), (Set) ArgumentMatchers.any(), (InvocationContext) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), (DeliveryGuarantee) ArgumentMatchers.any(), ArgumentMatchers.anyInt(), (Function) ArgumentMatchers.any())).thenReturn(wrapCompletionPublisher(Flowable.fromIterable(arrayList).doOnComplete(() -> {
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
            cyclicBarrier.await(10L, TimeUnit.SECONDS);
        })));
        Future fork = fork(() -> {
            this.n.addListener(stateListener);
            return null;
        });
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        switch (operation) {
            case REMOVE:
                str = "key-3";
                str2 = null;
                str3 = (String) ((CacheEntry) arrayList.get(3)).getValue();
                break;
            case CREATE:
                str = "new-key";
                str2 = "new-value";
                str3 = null;
                break;
            case PUT:
                str = "key-3";
                str2 = "key-3-new";
                str3 = (String) ((CacheEntry) arrayList.get(3)).getValue();
                break;
            default:
                throw new IllegalArgumentException("Unsupported Operation provided " + operation);
        }
        operation.raiseEvent(this.n, str, str3, str2, this.ctx);
        cyclicBarrier.await(10L, TimeUnit.SECONDS);
        fork.get(10L, TimeUnit.MINUTES);
        boolean isClustered = isClustered(stateListener);
        Assert.assertEquals(stateListener.events.size(), isClustered ? arrayList.size() + 1 : (arrayList.size() + 1) * 2);
        int i2 = 0;
        for (CacheEntry cacheEntry : arrayList) {
            if (isClustered) {
                CacheEntryEvent<String, String> cacheEntryEvent = stateListener.events.get(i2);
                Assert.assertEquals(cacheEntryEvent.getType(), Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertEquals(cacheEntryEvent.isPre(), false);
                Assert.assertEquals((String) cacheEntryEvent.getKey(), (String) cacheEntry.getKey());
                Assert.assertEquals((String) cacheEntryEvent.getValue(), (String) cacheEntry.getValue());
            } else {
                CacheEntryEvent<String, String> cacheEntryEvent2 = stateListener.events.get(i2 * 2);
                Assert.assertEquals(cacheEntryEvent2.getType(), Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertEquals(cacheEntryEvent2.isPre(), true);
                Assert.assertEquals((String) cacheEntryEvent2.getKey(), (String) cacheEntry.getKey());
                Assert.assertNull(cacheEntryEvent2.getValue());
                CacheEntryEvent<String, String> cacheEntryEvent3 = stateListener.events.get((i2 * 2) + 1);
                Assert.assertEquals(cacheEntryEvent3.getType(), Event.Type.CACHE_ENTRY_CREATED);
                Assert.assertEquals(cacheEntryEvent3.isPre(), false);
                Assert.assertEquals((String) cacheEntryEvent3.getKey(), (String) cacheEntry.getKey());
                Assert.assertEquals((String) cacheEntryEvent3.getValue(), (String) cacheEntry.getValue());
            }
            i2++;
        }
        if (isClustered) {
            CacheEntryEvent<String, String> cacheEntryEvent4 = stateListener.events.get(i2);
            Assert.assertEquals(cacheEntryEvent4.getType(), operation.getType());
            Assert.assertEquals(cacheEntryEvent4.isPre(), false);
            Assert.assertEquals((String) cacheEntryEvent4.getKey(), str);
            Assert.assertEquals((String) cacheEntryEvent4.getValue(), str2);
            return;
        }
        CacheEntryEvent<String, String> cacheEntryEvent5 = stateListener.events.get(i2 * 2);
        Assert.assertEquals(cacheEntryEvent5.getType(), operation.getType());
        Assert.assertEquals(cacheEntryEvent5.isPre(), true);
        Assert.assertEquals((String) cacheEntryEvent5.getKey(), str);
        Assert.assertEquals((String) cacheEntryEvent5.getValue(), str3);
        CacheEntryEvent<String, String> cacheEntryEvent6 = stateListener.events.get((i2 * 2) + 1);
        Assert.assertEquals(cacheEntryEvent6.getType(), operation.getType());
        Assert.assertEquals(cacheEntryEvent6.isPre(), false);
        Assert.assertEquals((String) cacheEntryEvent6.getKey(), str);
        Assert.assertEquals((String) cacheEntryEvent6.getValue(), str2);
    }

    private boolean isClustered(StateListener stateListener) {
        return stateListener.getClass().getAnnotation(Listener.class).clustered();
    }

    private SegmentPublisherSupplier<CacheEntry<String, String>> wrapCompletionPublisher(final Flowable<CacheEntry<String, String>> flowable) {
        return new SegmentPublisherSupplier<CacheEntry<String, String>>() { // from class: org.infinispan.notifications.cachelistener.BaseCacheNotifierImplInitialTransferTest.1
            public Publisher<CacheEntry<String, String>> publisherWithoutSegments() {
                return flowable;
            }

            public Publisher<SegmentPublisherSupplier.Notification<CacheEntry<String, String>>> publisherWithSegments() {
                return flowable.map(cacheEntry -> {
                    return Notifications.value(cacheEntry, 0);
                }).concatWith(Single.just(Notifications.segmentComplete(0)));
            }
        };
    }
}
