package org.apache.flink.cep.nfa.sharedbuffer;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.cep.nfa.sharedbuffer.EventId;
import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.util.WrappingRuntimeException;

/* loaded from: input_file:org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.class */
public class SharedBuffer<V> {
    private static final String entriesStateName = "sharedBuffer-entries";
    private static final String eventsStateName = "sharedBuffer-events";
    private static final String eventsCountStateName = "sharedBuffer-events-count";
    private MapState<EventId, Lockable<V>> eventsBuffer;
    private MapState<Long, Integer> eventsCount;
    private MapState<NodeId, Lockable<SharedBufferNode>> entries;
    private Map<EventId, Lockable<V>> eventsBufferCache = new HashMap();
    private Map<NodeId, Lockable<SharedBufferNode>> entryCache = new HashMap();

    public SharedBuffer(KeyedStateStore keyedStateStore, TypeSerializer<V> typeSerializer) {
        this.eventsBuffer = keyedStateStore.getMapState(new MapStateDescriptor(eventsStateName, EventId.EventIdSerializer.INSTANCE, new Lockable.LockableTypeSerializer(typeSerializer)));
        this.entries = keyedStateStore.getMapState(new MapStateDescriptor(entriesStateName, new NodeId.NodeIdSerializer(), new Lockable.LockableTypeSerializer(new SharedBufferNode.SharedBufferNodeSerializer())));
        this.eventsCount = keyedStateStore.getMapState(new MapStateDescriptor(eventsCountStateName, LongSerializer.INSTANCE, IntSerializer.INSTANCE));
    }

    @Deprecated
    public void init(Map<EventId, Lockable<V>> map, Map<NodeId, Lockable<SharedBufferNode>> map2) throws Exception {
        this.eventsBuffer.putAll(map);
        this.entries.putAll(map2);
        this.eventsCount.putAll((Map) map.keySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getTimestamp();
        }, (v0) -> {
            return v0.getId();
        }, (v0, v1) -> {
            return Math.max(v0, v1);
        })));
    }

    public SharedBufferAccessor<V> getAccessor() {
        return new SharedBufferAccessor<>(this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void advanceTime(long j) throws Exception {
        Iterator it = this.eventsCount.keys().iterator();
        while (it.hasNext()) {
            if (((Long) it.next()).longValue() < j) {
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventId registerEvent(V v, long j) throws Exception {
        Integer num = (Integer) this.eventsCount.get(Long.valueOf(j));
        if (num == null) {
            num = 0;
        }
        EventId eventId = new EventId(num.intValue(), j);
        Lockable<V> lockable = new Lockable<>(v, 1);
        this.eventsCount.put(Long.valueOf(j), Integer.valueOf(num.intValue() + 1));
        this.eventsBufferCache.put(eventId, lockable);
        return eventId;
    }

    public boolean isEmpty() throws Exception {
        return Iterables.isEmpty(this.eventsBufferCache.keySet()) && Iterables.isEmpty(this.eventsBuffer.keys());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void upsertEvent(EventId eventId, Lockable<V> lockable) {
        this.eventsBufferCache.put(eventId, lockable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void upsertEntry(NodeId nodeId, Lockable<SharedBufferNode> lockable) {
        this.entryCache.put(nodeId, lockable);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeEvent(EventId eventId) throws Exception {
        this.eventsBufferCache.remove(eventId);
        this.eventsBuffer.remove(eventId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeEntry(NodeId nodeId) throws Exception {
        this.entryCache.remove(nodeId);
        this.entries.remove(nodeId);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Lockable<SharedBufferNode> getEntry(NodeId nodeId) {
        return this.entryCache.computeIfAbsent(nodeId, nodeId2 -> {
            try {
                return (Lockable) this.entries.get(nodeId2);
            } catch (Exception e) {
                throw new WrappingRuntimeException(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Lockable<V> getEvent(EventId eventId) {
        return this.eventsBufferCache.computeIfAbsent(eventId, eventId2 -> {
            try {
                return (Lockable) this.eventsBuffer.get(eventId2);
            } catch (Exception e) {
                throw new WrappingRuntimeException(e);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flushCache() throws Exception {
        if (!this.entryCache.isEmpty()) {
            this.entries.putAll(this.entryCache);
            this.entryCache.clear();
        }
        if (this.eventsBufferCache.isEmpty()) {
            return;
        }
        this.eventsBuffer.putAll(this.eventsBufferCache);
        this.eventsBufferCache.clear();
    }

    @VisibleForTesting
    Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
        return this.eventsCount.iterator();
    }

    @VisibleForTesting
    public int getEventsBufferCacheSize() {
        return this.eventsBufferCache.size();
    }

    @VisibleForTesting
    public int getEventsBufferSize() throws Exception {
        return Iterables.size(this.eventsBuffer.entries());
    }

    @VisibleForTesting
    public int getSharedBufferNodeSize() throws Exception {
        return Iterables.size(this.entries.entries());
    }

    @VisibleForTesting
    public int getSharedBufferNodeCacheSize() throws Exception {
        return this.entryCache.size();
    }
}
