/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.query;

import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collector;
import java.util.stream.Stream;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.util.SerializableFunction;
import net.openhft.chronicle.core.util.SerializablePredicate;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.query.Query;
import net.openhft.chronicle.engine.api.query.Subscription;
import net.openhft.chronicle.engine.api.query.SubscriptionNotSupported;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.query.Filter;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteQuery<E>
implements Query<E> {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteQuery.class);
    private final Filter<E> filter = new Filter();
    private final Subscribable<E> subscribable;

    public RemoteQuery(Subscribable<E> eSubscribable) {
        this.subscribable = eSubscribable;
    }

    @Override
    public Query<E> filter(SerializablePredicate<? super E> predicate) {
        this.filter.addFilter(predicate);
        return this;
    }

    @Override
    public <R> Query<R> map(SerializableFunction<? super E, ? extends R> mapper) {
        this.filter.addMap(mapper);
        return this;
    }

    @Override
    public <R> Query<R> project(Class<R> rClass) {
        this.filter.addProject(rClass);
        return this;
    }

    @Override
    public <R> Query<R> flatMap(SerializableFunction<? super E, ? extends Query<? extends R>> mapper) {
        this.filter.addFlatMap(mapper);
        return this;
    }

    @Override
    public Stream<E> stream() {
        throw new UnsupportedOperationException("todo");
    }

    @Override
    public Subscription subscribe(Consumer<? super E> action) {
        this.subscribable.subscribe(action::accept, this.filter, EnumSet.of(RequestContext.Operation.BOOTSTRAP));
        return SubscriptionNotSupported.INSTANCE;
    }

    @Override
    public void forEach(Consumer<? super E> action) {
        this.forEach2(action);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void forEach2(Consumer<? super E> action) {
        final ArrayBlockingQueue queue = new ArrayBlockingQueue(128);
        final AtomicBoolean finished = new AtomicBoolean();
        Subscriber accept = new Subscriber<E>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onMessage(E o) throws InvalidSubscriberException {
                try {
                    boolean offer = queue.offer(o, 20L, TimeUnit.SECONDS);
                    BlockingQueue blockingQueue = queue;
                    synchronized (blockingQueue) {
                        queue.notifyAll();
                    }
                    if (!offer) {
                        LOG.error("Queue Full");
                        RemoteQuery.this.dumpThreads();
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void onEndOfSubscription() {
                finished.set(true);
                BlockingQueue blockingQueue = queue;
                synchronized (blockingQueue) {
                    queue.notifyAll();
                }
            }
        };
        this.subscribable.subscribe(accept, this.filter, EnumSet.of(RequestContext.Operation.BOOTSTRAP, RequestContext.Operation.END_SUBSCRIPTION_AFTER_BOOTSTRAP));
        while (!finished.get()) {
            try {
                Object message = queue.poll();
                if (message == null) {
                    ArrayBlockingQueue arrayBlockingQueue = queue;
                    synchronized (arrayBlockingQueue) {
                        queue.wait(1000L);
                        continue;
                    }
                }
                action.accept(message);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        queue.forEach(action::accept);
    }

    private void dumpThreads() {
        for (Map.Entry<Thread, StackTraceElement[]> entry : Thread.getAllStackTraces().entrySet()) {
            Thread thread = entry.getKey();
            if (thread.getThreadGroup().getName().equals("system")) continue;
            StringBuilder sb = new StringBuilder();
            sb.append(thread).append(" ").append((Object)thread.getState());
            Jvm.trimStackTrace((StringBuilder)sb, (StackTraceElement[])entry.getValue());
            sb.append("\n");
            LOG.error("\n========= THREAD DUMP =========\n", (Object)sb);
        }
    }

    @Override
    public <R, A> R collect(Collector<? super E, A, R> collector) {
        Object container = collector.supplier().get();
        this.forEach((Consumer<? super E>)((Consumer<Object>)o -> collector.accumulator().accept(container, o)));
        return collector.finisher().apply(container);
    }

    public static interface Subscribable<E> {
        public void subscribe(@NotNull Subscriber<E> var1, @NotNull Filter<E> var2, @NotNull Set<RequestContext.Operation> var3);
    }
}

