/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableSubscriber;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import org.infinispan.commons.util.ByRef;
import org.infinispan.commons.util.IntSet;
import org.infinispan.commons.util.IntSets;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.factories.scopes.Scope;
import org.infinispan.factories.scopes.Scopes;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.reactive.publisher.impl.DeliveryGuarantee;
import org.infinispan.reactive.publisher.impl.LocalPublisherManager;
import org.infinispan.reactive.publisher.impl.SegmentAwarePublisher;
import org.infinispan.reactive.publisher.impl.commands.batch.InitialPublisherCommand;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

@Listener(observation=Listener.Observation.POST)
@Scope(value=Scopes.NAMED_CACHE)
public class PublisherHandler {
    private static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private static final boolean trace = log.isTraceEnabled();
    private final ConcurrentMap<Object, PublisherState> currentRequests = new ConcurrentHashMap<Object, PublisherState>();
    @Inject
    CacheManagerNotifier managerNotifier;
    @Inject
    @ComponentName(value="org.infinispan.executors.non-blocking")
    ExecutorService nonBlockingExecutor;
    @Inject
    LocalPublisherManager localPublisherManager;

    @ViewChanged
    public void viewChange(ViewChangedEvent event) {
        List<Address> newMembers = event.getNewMembers();
        Iterator iter = this.currentRequests.values().iterator();
        while (iter.hasNext()) {
            PublisherState state = (PublisherState)iter.next();
            Address owner = state.getOrigin();
            if (owner == null || newMembers.contains(owner)) continue;
            log.tracef("View changed and no longer contains %s, closing %s publisher", owner, state.requestId);
            state.cancel();
            iter.remove();
        }
    }

    @Start
    public void start() {
        this.managerNotifier.addListener(this);
    }

    @Stop
    public void stop() {
        this.managerNotifier.removeListener(this);
    }

    public <I, R> CompletableFuture<PublisherResponse> register(InitialPublisherCommand<?, I, R> command) {
        PublisherState publisherState;
        String requestId = command.getRequestId();
        PublisherState previousState = this.currentRequests.put(requestId, publisherState = command.isTrackKeys() ? new KeyPublisherState(requestId, command.getOrigin(), command.getBatchSize()) : new PublisherState(requestId, command.getOrigin(), command.getBatchSize()));
        if (previousState != null) {
            if (!previousState.complete) {
                this.currentRequests.remove(requestId);
                throw new IllegalStateException("There was already a publisher registered for id " + requestId + " that wasn't complete!");
            }
            if (trace) {
                log.tracef("Closing prior state for %s to make room for a new request", requestId);
            }
            previousState.cancel();
        }
        publisherState.startProcessing(command);
        return publisherState.results();
    }

    public CompletableFuture<PublisherResponse> getNext(String requestId) {
        PublisherState publisherState = (PublisherState)this.currentRequests.get(requestId);
        if (publisherState == null) {
            throw new IllegalStateException("Publisher for requestId " + requestId + " doesn't exist!");
        }
        return publisherState.results();
    }

    public int openPublishers() {
        return this.currentRequests.size();
    }

    public void closePublisher(String requestId) {
        PublisherState state = (PublisherState)this.currentRequests.remove(requestId);
        if (state != null) {
            if (trace) {
                log.tracef("Closed publisher using requestId %s", requestId);
            }
            state.cancel();
        }
    }

    private void closePublisher(String requestId, PublisherState state) {
        if (this.currentRequests.remove(requestId, state)) {
            if (trace) {
                log.tracef("Closed publisher from completion using requestId %s", requestId);
            }
            state.cancel();
        } else if (trace) {
            log.tracef("A concurrent request already closed the prior state for %s", requestId);
        }
    }

    class KeyPublisherState
    extends PublisherState {
        Object[] extraValues;
        int extraPos;
        Object[] keys;
        int keyPos;
        long publisherOffset;
        long consumerOffset;
        long requestOffset;
        Object keyForSegmentCompletion;
        Map<Object, Integer> keySegmentCompletions;
        Map<Long, Object> keyCompletionPosition;
        boolean previousValueFinishedKey;

        private KeyPublisherState(String requestId, Address origin, int batchSize) {
            super(requestId, origin, batchSize);
            this.keySegmentCompletions = new HashMap<Object, Integer>();
            this.keyCompletionPosition = new HashMap<Long, Object>();
            this.previousValueFinishedKey = true;
            this.keys = new Object[batchSize];
        }

        @Override
        void startProcessing(InitialPublisherCommand command) {
            SegmentAwarePublisher sap = command.isEntryStream() ? PublisherHandler.this.localPublisherManager.entryPublisher(command.getSegments(), command.getKeys(), command.getExcludedKeys(), command.isIncludeLoader(), DeliveryGuarantee.EXACTLY_ONCE, Function.identity()) : PublisherHandler.this.localPublisherManager.keyPublisher(command.getSegments(), command.getKeys(), command.getExcludedKeys(), command.isIncludeLoader(), DeliveryGuarantee.EXACTLY_ONCE, Function.identity());
            Function functionToApply = command.getTransformer();
            Flowable.fromPublisher(s -> sap.subscribe(s, this::segmentComplete, this::segmentLost)).doOnNext(originalValue -> {
                this.keyForSegmentCompletion = originalValue;
            }).concatMap(originalValue -> {
                ByRef.Integer size = new ByRef.Integer(0);
                return Flowable.fromPublisher((Publisher)((Publisher)functionToApply.apply(Flowable.just((Object)originalValue)))).doOnNext(ignore -> size.inc()).doOnComplete(() -> {
                    int total = size.get();
                    if (total > 0) {
                        this.publisherOffset += (long)total;
                        if (this.publisherOffset == this.consumerOffset) {
                            this.keyCompleted(originalValue);
                            this.previousValueFinishedKey = true;
                        } else {
                            this.keyCompletionPosition.put(this.publisherOffset, originalValue);
                        }
                    } else {
                        Integer segment = this.keySegmentCompletions.remove(originalValue);
                        if (segment != null) {
                            if (trace) {
                                log.tracef("Completing segment %s due to empty resulting value of %s for %s", segment, originalValue, this.requestId);
                            }
                            this.actualCompleteSegment(segment);
                        }
                        if (this.keyForSegmentCompletion == originalValue) {
                            this.keyForSegmentCompletion = null;
                        }
                    }
                });
            }).subscribe((FlowableSubscriber)this);
        }

        @Override
        PublisherResponse generateResponse(boolean complete) {
            return new KeyPublisherResponse(this.results, this.completedSegments, this.lostSegments, this.pos, complete, this.extraValues, this.extraPos, this.keys, this.keyPos);
        }

        @Override
        PublisherResponse mergeResponses(PublisherResponse publisherResponse1, PublisherResponse publisherResponse2) {
            KeyPublisherResponse response1 = (KeyPublisherResponse)publisherResponse1;
            KeyPublisherResponse response2 = (KeyPublisherResponse)publisherResponse2;
            IntSet completedSegments = this.mergeSegments(response1.getCompletedSegments(), response2.getCompletedSegments());
            IntSet lostSegments = this.mergeSegments(response1.getLostSegments(), response2.getLostSegments());
            int newSize = response1.getSize() + response1.getExtraSize() + response2.getSize() + response2.getExtraSize();
            Object[] newArray = new Object[newSize];
            int offset = 0;
            offset = this.addToArray(response1.getResults(), newArray, offset);
            offset = this.addToArray(response1.getExtraObjects(), newArray, offset);
            offset = this.addToArray(response2.getResults(), newArray, offset);
            this.addToArray(response2.getExtraObjects(), newArray, offset);
            boolean complete = response2.isComplete();
            assert (complete);
            return new PublisherResponse(newArray, completedSegments, lostSegments, newSize, complete, newArray.length);
        }

        @Override
        public void onComplete() {
            if (trace) {
                log.tracef("Completed state for %s", this.requestId);
            }
            super.onComplete();
        }

        @Override
        protected void requestMore(Subscription subscription, int requestAmount) {
            this.requestOffset += (long)requestAmount;
            super.requestMore(subscription, requestAmount);
        }

        private void keyCompleted(Object key) {
            Integer segmentToComplete;
            if (this.keyForSegmentCompletion == key) {
                this.keyForSegmentCompletion = null;
            }
            if ((segmentToComplete = this.keySegmentCompletions.remove(key)) != null) {
                if (trace) {
                    log.tracef("Completing segment %s from key %s for %s", segmentToComplete, key, this.requestId);
                }
                this.actualCompleteSegment(segmentToComplete);
                this.keys = null;
                this.keyPos = 0;
            } else {
                if (this.keys == null) {
                    this.keys = new Object[this.batchSize];
                }
                this.keys[this.keyPos++] = key;
            }
        }

        @Override
        public void onNext(Object value) {
            Object key;
            if (this.previousValueFinishedKey) {
                this.tryPrepareResponse();
            }
            boolean bl = this.previousValueFinishedKey = (key = this.keyCompletionPosition.remove(++this.consumerOffset)) != null;
            if (this.previousValueFinishedKey) {
                this.keyCompleted(key);
            }
            if (this.pos == this.results.length) {
                if (this.extraValues == null) {
                    this.extraValues = new Object[8];
                }
                if (this.extraPos == this.extraValues.length) {
                    Object[] expandedArray = new Object[this.extraValues.length << 1];
                    System.arraycopy(this.extraValues, 0, expandedArray, 0, this.extraPos);
                    this.extraValues = expandedArray;
                }
                this.extraValues[this.extraPos++] = value;
                if (this.consumerOffset == this.requestOffset) {
                    this.requestMore(this.upstream, 2);
                }
            } else {
                this.results[this.pos++] = value;
            }
        }

        @Override
        void resetValues() {
            super.resetValues();
            this.keyResetValues();
        }

        void keyResetValues() {
            this.extraValues = null;
            this.extraPos = 0;
            this.keys = null;
            this.keyPos = 0;
        }

        @Override
        public void segmentComplete(int segment) {
            if (this.keyForSegmentCompletion == null) {
                if (trace) {
                    log.tracef("Completing segment %s for %s", segment, this.requestId);
                }
                this.actualCompleteSegment(segment);
            } else {
                if (trace) {
                    log.tracef("Delaying segment completion for %s until key %s is fully consumed for %s", segment, this.keyForSegmentCompletion, this.requestId);
                }
                this.keySegmentCompletions.put(this.keyForSegmentCompletion, segment);
                this.keyForSegmentCompletion = null;
            }
        }

        private void actualCompleteSegment(int segment) {
            super.segmentComplete(segment);
            this.keyPos = 0;
        }

        @Override
        public void segmentLost(int segment) {
            super.segmentLost(segment);
            this.keyResetValues();
        }

        @Override
        void prepareResponse(boolean complete) {
            if (complete) {
                assert (this.keySegmentCompletions.isEmpty());
                assert (this.keyCompletionPosition.isEmpty());
            }
            super.prepareResponse(complete);
        }

        void tryPrepareResponse() {
            if (this.pos == this.results.length) {
                this.prepareResponse(false);
            }
        }
    }

    private class PublisherState
    implements FlowableSubscriber<Object>,
    Runnable {
        final String requestId;
        final Address origin;
        final int batchSize;
        @GuardedBy(value="this")
        private CompletableFuture<PublisherResponse> futureResponse = null;
        Subscription upstream;
        Object[] results;
        int pos;
        IntSet completedSegments;
        IntSet lostSegments;
        int segmentStart;
        volatile boolean complete;

        private PublisherState(String requestId, Address origin, int batchSize) {
            this.requestId = requestId;
            this.origin = origin;
            this.batchSize = batchSize;
            this.results = new Object[batchSize];
        }

        void startProcessing(InitialPublisherCommand command) {
            SegmentAwarePublisher sap = command.isEntryStream() ? PublisherHandler.this.localPublisherManager.entryPublisher(command.getSegments(), command.getKeys(), command.getExcludedKeys(), command.isIncludeLoader(), command.getDeliveryGuarantee(), command.getTransformer()) : PublisherHandler.this.localPublisherManager.keyPublisher(command.getSegments(), command.getKeys(), command.getExcludedKeys(), command.isIncludeLoader(), command.getDeliveryGuarantee(), command.getTransformer());
            sap.subscribe(this, this::segmentComplete, this::segmentLost);
        }

        public void onSubscribe(Subscription s) {
            if (this.upstream != null) {
                throw new IllegalStateException("Subscription was already set!");
            }
            this.upstream = Objects.requireNonNull(s);
            this.requestMore(s, this.batchSize + 1);
        }

        protected void requestMore(Subscription subscription, int requestAmount) {
            subscription.request((long)requestAmount);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onError(Throwable t) {
            this.complete = true;
            PublisherState publisherState = this;
            synchronized (publisherState) {
                this.futureResponse = CompletableFutures.completedExceptionFuture(t);
            }
        }

        public void onComplete() {
            this.prepareResponse(true);
        }

        public void onNext(Object o) {
            if (this.pos == this.results.length) {
                this.prepareResponse(false);
            }
            this.results[this.pos++] = o;
        }

        public void segmentComplete(int segment) {
            if (this.completedSegments == null) {
                this.completedSegments = IntSets.mutableEmptySet();
            }
            this.completedSegments.add(segment);
            this.segmentStart = this.pos;
        }

        public void segmentLost(int segment) {
            if (this.lostSegments == null) {
                this.lostSegments = IntSets.mutableEmptySet();
            }
            this.lostSegments.add(segment);
            this.pos = this.segmentStart;
        }

        public void cancel() {
            Subscription subscription = this.upstream;
            if (subscription != null) {
                subscription.cancel();
            }
        }

        void resetValues() {
            this.results = new Object[this.batchSize];
            this.completedSegments = null;
            this.lostSegments = null;
            this.pos = 0;
            this.segmentStart = 0;
        }

        PublisherResponse generateResponse(boolean complete) {
            return new PublisherResponse(this.results, this.completedSegments, this.lostSegments, this.pos, complete, this.segmentStart);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        void prepareResponse(boolean complete) {
            PublisherResponse response = this.generateResponse(complete);
            if (trace) {
                log.tracef("Response ready %s with id %s for requestor %s", response, this.requestId, this.origin);
            }
            if (!complete) {
                this.resetValues();
            }
            this.complete = complete;
            CompletableFuture<PublisherResponse> futureToComplete = null;
            PublisherState publisherState = this;
            synchronized (publisherState) {
                if (this.futureResponse != null) {
                    if (this.futureResponse.isDone()) {
                        PublisherResponse prevResponse = this.futureResponse.join();
                        PublisherResponse newResponse = this.mergeResponses(prevResponse, response);
                        this.futureResponse = CompletableFuture.completedFuture(newResponse);
                        if (trace) {
                            log.tracef("Received additional response, merged responses together %d for request id %s", System.identityHashCode(this.futureResponse), this.requestId);
                        }
                    } else {
                        futureToComplete = this.futureResponse;
                        this.futureResponse = null;
                    }
                } else {
                    this.futureResponse = CompletableFuture.completedFuture(response);
                    if (trace) {
                        log.tracef("Eager response completed %d for request id %s", System.identityHashCode(this.futureResponse), this.requestId);
                    }
                }
            }
            if (futureToComplete != null) {
                if (trace) {
                    log.tracef("Completing waiting future %d for request id %s", System.identityHashCode(futureToComplete), this.requestId);
                }
                futureToComplete.complete(response);
            }
        }

        PublisherResponse mergeResponses(PublisherResponse response1, PublisherResponse response2) {
            IntSet completedSegments = this.mergeSegments(response1.getCompletedSegments(), response2.getCompletedSegments());
            IntSet lostSegments = this.mergeSegments(response1.getLostSegments(), response2.getLostSegments());
            int newSize = response1.getSize() + response2.getSize();
            Object[] newArray = new Object[newSize];
            int offset = 0;
            offset = this.addToArray(response1.getResults(), newArray, offset);
            this.addToArray(response2.getResults(), newArray, offset);
            boolean complete = response2.isComplete();
            assert (complete);
            return new PublisherResponse(newArray, completedSegments, lostSegments, newSize, complete, newArray.length);
        }

        int addToArray(Object[] src, Object[] dst, int offset) {
            if (src != null) {
                for (Object obj : src) {
                    if (obj == null) break;
                    dst[offset++] = obj;
                }
            }
            return offset;
        }

        IntSet mergeSegments(IntSet segments1, IntSet segments2) {
            if (segments1 == null) {
                return segments2;
            }
            if (segments2 == null) {
                return segments1;
            }
            segments1.addAll(segments2);
            return segments1;
        }

        public Address getOrigin() {
            return this.origin;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        CompletableFuture<PublisherResponse> results() {
            CompletableFuture<PublisherResponse> currentFuture;
            boolean submitRequest = false;
            PublisherState publisherState = this;
            synchronized (publisherState) {
                if (this.futureResponse == null) {
                    currentFuture = new CompletableFuture();
                    currentFuture.thenRunAsync(this, PublisherHandler.this.nonBlockingExecutor);
                    this.futureResponse = currentFuture;
                } else {
                    currentFuture = this.futureResponse;
                    this.futureResponse = null;
                    submitRequest = true;
                }
            }
            if (submitRequest) {
                PublisherHandler.this.nonBlockingExecutor.execute(this);
            }
            if (trace) {
                log.tracef("Retrieved future %d for request id %s", System.identityHashCode(currentFuture), this.requestId);
            }
            return currentFuture;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (trace) {
                log.tracef("Running handler for request id %s", this.requestId);
            }
            if (!this.complete) {
                int requestAmount = this.batchSize;
                if (trace) {
                    log.tracef("Requesting %d additional entries for %s", requestAmount, this.requestId);
                }
                this.requestMore(this.upstream, requestAmount);
            } else {
                PublisherState publisherState = this;
                synchronized (publisherState) {
                    if (this.futureResponse == null) {
                        PublisherHandler.this.closePublisher(this.requestId, this);
                    } else if (trace) {
                        log.tracef("Skipping run as handler is complete, but still has some results for id %s", this.requestId);
                    }
                }
            }
        }
    }
}

