/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.server.internal;

import java.util.function.BiConsumer;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.engine.api.EngineReplication;
import net.openhft.chronicle.engine.api.pubsub.Replication;
import net.openhft.chronicle.engine.map.CMap2EngineReplicator;
import net.openhft.chronicle.engine.map.replication.Bootstrap;
import net.openhft.chronicle.engine.server.internal.AbstractHandler;
import net.openhft.chronicle.engine.tree.HostIdentifier;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.WireOutPublisher;
import net.openhft.chronicle.threads.HandlerPriority;
import net.openhft.chronicle.threads.api.EventHandler;
import net.openhft.chronicle.threads.api.EventLoop;
import net.openhft.chronicle.threads.api.InvalidEventHandlerException;
import net.openhft.chronicle.wire.ParameterizeWireKey;
import net.openhft.chronicle.wire.ReadMarshallable;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.WriteMarshallable;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationHandler<E>
extends AbstractHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationHandler.class);
    private final StringBuilder eventName = new StringBuilder();
    private Replication replication;
    private WireOutPublisher publisher;
    private HostIdentifier hostId;
    private long tid;
    private EventLoop eventLoop;
    @NotNull
    private final BiConsumer<WireIn, Long> dataConsumer = new BiConsumer<WireIn, Long>(){
        final ThreadLocal<CMap2EngineReplicator.VanillaReplicatedEntry> vre = ThreadLocal.withInitial(CMap2EngineReplicator.VanillaReplicatedEntry::new);

        @Override
        public void accept(@NotNull WireIn inWire, Long inputTid) {
            ReplicationHandler.this.eventName.setLength(0);
            ValueIn valueIn = inWire.readEventName(ReplicationHandler.this.eventName);
            if (CoreFields.lastUpdateTime.contentEquals((CharSequence)ReplicationHandler.this.eventName)) {
                if (Jvm.isDebug()) {
                    System.out.println("server : received lastUpdateTime");
                }
                long time = valueIn.int64();
                byte id = inWire.read(() -> "id").int8();
                ReplicationHandler.this.replication.setLastModificationTime(id, time);
                return;
            }
            if (EventId.replicationEvent.contentEquals(ReplicationHandler.this.eventName)) {
                if (Jvm.isDebug() && LOG.isDebugEnabled()) {
                    LOG.debug("server : received replicationEvent");
                }
                CMap2EngineReplicator.VanillaReplicatedEntry replicatedEntry = this.vre.get();
                valueIn.marshallable((ReadMarshallable)replicatedEntry);
                if (Jvm.isDebug() && LOG.isDebugEnabled()) {
                    LOG.debug("*****\t\t\t\t ->  RECEIVED : SERVER : replication latency=" + (System.currentTimeMillis() - replicatedEntry.timestamp()) + "ms  ");
                }
                ReplicationHandler.this.replication.applyReplication(replicatedEntry);
                return;
            }
            assert (ReplicationHandler.this.outWire != null);
            ReplicationHandler.this.outWire.writeDocument(true, wire -> ReplicationHandler.this.outWire.writeEventName((WireKey)CoreFields.tid).int64(ReplicationHandler.this.tid));
            if (EventId.identifier.contentEquals(ReplicationHandler.this.eventName)) {
                ReplicationHandler.this.writeData(inWire.bytes(), out -> ReplicationHandler.this.outWire.write((WireKey)EventId.identifierReply).int8(ReplicationHandler.this.hostId.hostId()));
            }
            if (EventId.bootstrap.contentEquals(ReplicationHandler.this.eventName)) {
                ReplicationHandler.this.writeData(true, inWire.bytes(), out -> {
                    Bootstrap inBootstrap;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("server : received bootstrap request");
                    }
                    if ((inBootstrap = (Bootstrap)valueIn.typedMarshallable()) == null) {
                        return;
                    }
                    byte id = inBootstrap.identifier();
                    EngineReplication.ModificationIterator mi = ReplicationHandler.this.replication.acquireModificationIterator(id);
                    if (mi != null) {
                        mi.dirtyEntries(inBootstrap.lastUpdatedTime());
                    }
                    Bootstrap outBootstrap = new Bootstrap();
                    outBootstrap.identifier(ReplicationHandler.this.hostId.hostId());
                    outBootstrap.lastUpdatedTime(ReplicationHandler.this.replication.lastModificationTime(id));
                    ReplicationHandler.this.outWire.writeEventName((WireKey)EventId.bootstrap).typedMarshallable((WriteMarshallable)outBootstrap);
                    if (Jvm.isDebug()) {
                        System.out.println("server : received replicationSubscribe");
                    }
                    if (mi == null) {
                        return;
                    }
                    mi.setModificationNotifier(() -> ((EventLoop)ReplicationHandler.this.eventLoop).unpause());
                    ReplicationHandler.this.eventLoop.addHandler(true, (EventHandler)new ReplicationEventHandler(mi, id, inputTid));
                });
            }
        }
    };

    void process(@NotNull WireIn inWire, WireOutPublisher publisher, long tid, Wire outWire, HostIdentifier hostId, Replication replication, EventLoop eventLoop) {
        this.eventLoop = eventLoop;
        this.setOutWire((WireOut)outWire);
        this.hostId = hostId;
        this.publisher = publisher;
        this.replication = replication;
        this.tid = tid;
        this.dataConsumer.accept(inWire, tid);
    }

    private class ReplicationEventHandler
    implements EventHandler {
        private final EngineReplication.ModificationIterator mi;
        private final byte id;
        private final Long inputTid;
        boolean hasSentLastUpdateTime;
        long lastUpdateTime;
        boolean hasLogged;
        int count;
        long startBufferFullTimeStamp;

        public ReplicationEventHandler(EngineReplication.ModificationIterator mi, byte id, Long inputTid) {
            this.mi = mi;
            this.id = id;
            this.inputTid = inputTid;
            this.lastUpdateTime = 0L;
            this.hasLogged = false;
            this.count = 0;
            this.startBufferFullTimeStamp = 0L;
        }

        @NotNull
        public HandlerPriority priority() {
            return HandlerPriority.REPLICATION;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean action() throws InvalidEventHandlerException {
            WireOutPublisher publisher;
            if (ReplicationHandler.this.connectionClosed) {
                throw new InvalidEventHandlerException();
            }
            WireOutPublisher wireOutPublisher = publisher = ReplicationHandler.this.publisher;
            synchronized (wireOutPublisher) {
                if (!publisher.canTakeMoreData()) {
                    if (this.startBufferFullTimeStamp == 0L) {
                        this.startBufferFullTimeStamp = System.currentTimeMillis();
                    }
                    return false;
                }
                if (!this.mi.hasNext()) {
                    if (this.startBufferFullTimeStamp != 0L) {
                        long timetaken = System.currentTimeMillis() - this.startBufferFullTimeStamp;
                        if (timetaken > 100L) {
                            LOG.info("blocked - outbound buffer full=" + timetaken + "ms");
                        }
                        this.startBufferFullTimeStamp = 0L;
                    }
                    if (!this.hasSentLastUpdateTime && this.lastUpdateTime > 0L) {
                        publisher.put(null, publish -> publish.writeNotReadyDocument(false, wire -> {
                            wire.writeEventName((WireKey)CoreFields.lastUpdateTime).int64(this.lastUpdateTime);
                            wire.write(() -> "id").int8(this.id);
                        }));
                        this.hasSentLastUpdateTime = true;
                        if (!this.hasLogged) {
                            LOG.info("received ALL replication the EVENTS for id=" + this.id);
                            this.hasLogged = true;
                        }
                    }
                    return false;
                }
                this.mi.nextEntry(e -> publisher.put(null, publish1 -> {
                    long delay;
                    if (e.remoteIdentifier() == ReplicationHandler.this.hostId.hostId()) {
                        return;
                    }
                    long newlastUpdateTime = Math.max(this.lastUpdateTime, e.timestamp());
                    if (newlastUpdateTime > this.lastUpdateTime) {
                        this.hasSentLastUpdateTime = false;
                        this.lastUpdateTime = newlastUpdateTime;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("publish from server response from iterator localIdentifier=" + ReplicationHandler.this.hostId + " ,remoteIdentifier=" + this.id + " event=" + e);
                    }
                    publish1.writeNotReadyDocument(true, wire -> wire.writeEventName((WireKey)CoreFields.tid).int64(this.inputTid.longValue()));
                    if (LOG.isInfoEnabled() && (delay = System.currentTimeMillis() - e.timestamp()) > 60L) {
                        LOG.info("Snt Srv latency=" + delay + "ms\t");
                        if (this.count++ % 10 == 1) {
                            LOG.info("");
                        }
                    }
                    if (publish1.bytes().writePosition() > 100000L && LOG.isDebugEnabled()) {
                        LOG.debug(publish1.bytes().toDebugString(128L));
                    }
                    publish1.writeNotReadyDocument(false, wire -> wire.writeEventName((WireKey)EventId.replicationEvent).typedMarshallable((WriteMarshallable)e));
                }));
            }
            return true;
        }
    }

    public static enum EventId implements ParameterizeWireKey
    {
        publish(new WireKey[0]),
        onEndOfSubscription(new WireKey[0]),
        apply(new WireKey[0]),
        replicationEvent(new WireKey[0]),
        bootstrap(new WireKey[0]),
        identifierReply(new WireKey[0]),
        identifier(new WireKey[0]);

        private final WireKey[] params;

        @SafeVarargs
        private <P extends WireKey> EventId(P ... params) {
            this.params = params;
        }

        @NotNull
        public <P extends WireKey> P[] params() {
            return this.params;
        }
    }
}

