/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryWindowStore
implements WindowStore<Bytes, byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryWindowStore.class);
    private static final int SEQNUM_SIZE = 4;
    private final String name;
    private final String metricScope;
    private InternalProcessorContext context;
    private Sensor expiredRecordSensor;
    private int seqnum = 0;
    private long observedStreamTime = -1L;
    private final long retentionPeriod;
    private final long windowSize;
    private final boolean retainDuplicates;
    private final ConcurrentNavigableMap<Long, ConcurrentNavigableMap<Bytes, byte[]>> segmentMap = new ConcurrentSkipListMap<Long, ConcurrentNavigableMap<Bytes, byte[]>>();
    private final Set<InMemoryWindowStoreIteratorWrapper> openIterators = ConcurrentHashMap.newKeySet();
    private volatile boolean open = false;

    InMemoryWindowStore(String name, long retentionPeriod, long windowSize, boolean retainDuplicates, String metricScope) {
        this.name = name;
        this.retentionPeriod = retentionPeriod;
        this.windowSize = windowSize;
        this.retainDuplicates = retainDuplicates;
        this.metricScope = metricScope;
    }

    @Override
    public String name() {
        return this.name;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        this.context = (InternalProcessorContext)context;
        StreamsMetricsImpl metrics = this.context.metrics();
        String threadId = Thread.currentThread().getName();
        String taskName = context.taskId().toString();
        this.expiredRecordSensor = metrics.storeLevelSensor(threadId, taskName, this.name(), "expired-window-record-drop", Sensor.RecordingLevel.INFO, new Sensor[0]);
        StreamsMetricsImpl.addInvocationRateAndCountToSensor(this.expiredRecordSensor, "stream-" + this.metricScope + "-metrics", metrics.tagMap(threadId, "task-id", taskName, this.metricScope + "-id", this.name()), "expired-window-record-drop");
        if (root != null) {
            context.register(root, (key, value) -> this.put(Bytes.wrap((byte[])WindowKeySchema.extractStoreKeyBytes(key)), value, WindowKeySchema.extractStoreTimestamp(key)));
        }
        this.open = true;
    }

    @Override
    @Deprecated
    public void put(Bytes key, byte[] value) {
        this.put(key, value, this.context.timestamp());
    }

    @Override
    public void put(Bytes key, byte[] value, long windowStartTimestamp) {
        Bytes keyBytes;
        this.removeExpiredSegments();
        this.maybeUpdateSeqnumForDups();
        this.observedStreamTime = Math.max(this.observedStreamTime, windowStartTimestamp);
        Bytes bytes = keyBytes = this.retainDuplicates ? InMemoryWindowStore.wrapForDups(key, this.seqnum) : key;
        if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
            this.expiredRecordSensor.record();
            LOG.warn("Skipping record for expired segment.");
        } else if (value != null) {
            this.segmentMap.computeIfAbsent(windowStartTimestamp, t -> new ConcurrentSkipListMap());
            ((ConcurrentNavigableMap)this.segmentMap.get(windowStartTimestamp)).put(keyBytes, value);
        } else {
            this.segmentMap.computeIfPresent(windowStartTimestamp, (t, kvMap) -> {
                kvMap.remove(keyBytes);
                if (kvMap.isEmpty()) {
                    this.segmentMap.remove(windowStartTimestamp);
                }
                return kvMap;
            });
        }
    }

    @Override
    public byte[] fetch(Bytes key, long windowStartTimestamp) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        if (windowStartTimestamp <= this.observedStreamTime - this.retentionPeriod) {
            return null;
        }
        ConcurrentNavigableMap kvMap = (ConcurrentNavigableMap)this.segmentMap.get(windowStartTimestamp);
        if (kvMap == null) {
            return null;
        }
        return (byte[])kvMap.get(key);
    }

    @Override
    @Deprecated
    public WindowStoreIterator<byte[]> fetch(Bytes key, long timeFrom, long timeTo) {
        Objects.requireNonNull(key, "key cannot be null");
        this.removeExpiredSegments();
        long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1L);
        if (timeTo < minTime) {
            return WrappedInMemoryWindowStoreIterator.emptyIterator();
        }
        return this.registerNewWindowStoreIterator(key, this.segmentMap.subMap((Object)minTime, true, (Object)timeTo, true).entrySet().iterator());
    }

    @Override
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes from, Bytes to, long timeFrom, long timeTo) {
        Objects.requireNonNull(from, "from key cannot be null");
        Objects.requireNonNull(to, "to key cannot be null");
        this.removeExpiredSegments();
        if (from.compareTo(to) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1L);
        if (timeTo < minTime) {
            return KeyValueIterators.emptyIterator();
        }
        return this.registerNewWindowedKeyValueIterator(from, to, this.segmentMap.subMap((Object)minTime, true, (Object)timeTo, true).entrySet().iterator());
    }

    @Override
    @Deprecated
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long timeFrom, long timeTo) {
        this.removeExpiredSegments();
        long minTime = Math.max(timeFrom, this.observedStreamTime - this.retentionPeriod + 1L);
        if (timeTo < minTime) {
            return KeyValueIterators.emptyIterator();
        }
        return this.registerNewWindowedKeyValueIterator(null, null, this.segmentMap.subMap((Object)minTime, true, (Object)timeTo, true).entrySet().iterator());
    }

    @Override
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        this.removeExpiredSegments();
        long minTime = this.observedStreamTime - this.retentionPeriod;
        return this.registerNewWindowedKeyValueIterator(null, null, this.segmentMap.tailMap((Object)minTime, false).entrySet().iterator());
    }

    @Override
    public boolean persistent() {
        return false;
    }

    @Override
    public boolean isOpen() {
        return this.open;
    }

    @Override
    public void flush() {
    }

    @Override
    public void close() {
        if (this.openIterators.size() != 0) {
            LOG.warn("Closing {} open iterators for store {}", (Object)this.openIterators.size(), (Object)this.name);
            for (InMemoryWindowStoreIteratorWrapper it : this.openIterators) {
                it.close();
            }
        }
        this.segmentMap.clear();
        this.open = false;
    }

    private void removeExpiredSegments() {
        long minLiveTime = Math.max(0L, this.observedStreamTime - this.retentionPeriod + 1L);
        for (InMemoryWindowStoreIteratorWrapper it : this.openIterators) {
            minLiveTime = Math.min(minLiveTime, it.minTime());
        }
        this.segmentMap.headMap((Object)minLiveTime, false).clear();
    }

    private void maybeUpdateSeqnumForDups() {
        if (this.retainDuplicates) {
            this.seqnum = this.seqnum + 1 & Integer.MAX_VALUE;
        }
    }

    private static Bytes wrapForDups(Bytes key, int seqnum) {
        ByteBuffer buf = ByteBuffer.allocate(key.get().length + 4);
        buf.put(key.get());
        buf.putInt(seqnum);
        return Bytes.wrap((byte[])buf.array());
    }

    private static Bytes getKey(Bytes keyBytes) {
        byte[] bytes = new byte[keyBytes.get().length - 4];
        System.arraycopy(keyBytes.get(), 0, bytes, 0, bytes.length);
        return Bytes.wrap((byte[])bytes);
    }

    private WrappedInMemoryWindowStoreIterator registerNewWindowStoreIterator(Bytes key, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
        Bytes keyFrom = this.retainDuplicates ? InMemoryWindowStore.wrapForDups(key, 0) : key;
        Bytes keyTo = this.retainDuplicates ? InMemoryWindowStore.wrapForDups(key, Integer.MAX_VALUE) : key;
        WrappedInMemoryWindowStoreIterator iterator = new WrappedInMemoryWindowStoreIterator(keyFrom, keyTo, segmentIterator, this.openIterators::remove, this.retainDuplicates);
        this.openIterators.add(iterator);
        return iterator;
    }

    private WrappedWindowedKeyValueIterator registerNewWindowedKeyValueIterator(Bytes keyFrom, Bytes keyTo, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator) {
        Bytes from = this.retainDuplicates && keyFrom != null ? InMemoryWindowStore.wrapForDups(keyFrom, 0) : keyFrom;
        Bytes to = this.retainDuplicates && keyTo != null ? InMemoryWindowStore.wrapForDups(keyTo, Integer.MAX_VALUE) : keyTo;
        WrappedWindowedKeyValueIterator iterator = new WrappedWindowedKeyValueIterator(from, to, segmentIterator, this.openIterators::remove, this.retainDuplicates, this.windowSize);
        this.openIterators.add(iterator);
        return iterator;
    }

    private static class WrappedWindowedKeyValueIterator
    extends InMemoryWindowStoreIteratorWrapper
    implements KeyValueIterator<Windowed<Bytes>, byte[]> {
        private final long windowSize;

        WrappedWindowedKeyValueIterator(Bytes keyFrom, Bytes keyTo, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator, ClosingCallback callback, boolean retainDuplicates, long windowSize) {
            super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates);
            this.windowSize = windowSize;
        }

        @Override
        public Windowed<Bytes> peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.getWindowedKey();
        }

        @Override
        public KeyValue<Windowed<Bytes>, byte[]> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Windowed<Bytes>, byte[]> result = new KeyValue<Windowed<Bytes>, byte[]>(this.getWindowedKey(), (byte[])((InMemoryWindowStoreIteratorWrapper)this).next.value);
            ((InMemoryWindowStoreIteratorWrapper)this).next = null;
            return result;
        }

        private Windowed<Bytes> getWindowedKey() {
            Bytes key = ((InMemoryWindowStoreIteratorWrapper)this).retainDuplicates ? InMemoryWindowStore.getKey((Bytes)((InMemoryWindowStoreIteratorWrapper)this).next.key) : (Bytes)((InMemoryWindowStoreIteratorWrapper)this).next.key;
            long endTime = ((InMemoryWindowStoreIteratorWrapper)this).currentTime + this.windowSize;
            if (endTime < 0L) {
                LOG.warn("Warning: window end time was truncated to Long.MAX");
                endTime = Long.MAX_VALUE;
            }
            TimeWindow timeWindow = new TimeWindow(((InMemoryWindowStoreIteratorWrapper)this).currentTime, endTime);
            return new Windowed<Bytes>(key, timeWindow);
        }
    }

    private static class WrappedInMemoryWindowStoreIterator
    extends InMemoryWindowStoreIteratorWrapper
    implements WindowStoreIterator<byte[]> {
        WrappedInMemoryWindowStoreIterator(Bytes keyFrom, Bytes keyTo, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator, ClosingCallback callback, boolean retainDuplicates) {
            super(keyFrom, keyTo, segmentIterator, callback, retainDuplicates);
        }

        @Override
        public Long peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return ((InMemoryWindowStoreIteratorWrapper)this).currentTime;
        }

        @Override
        public KeyValue<Long, byte[]> next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue<Long, byte[]> result = new KeyValue<Long, byte[]>(((InMemoryWindowStoreIteratorWrapper)this).currentTime, (byte[])((InMemoryWindowStoreIteratorWrapper)this).next.value);
            ((InMemoryWindowStoreIteratorWrapper)this).next = null;
            return result;
        }

        public static WrappedInMemoryWindowStoreIterator emptyIterator() {
            return new WrappedInMemoryWindowStoreIterator(null, null, null, it -> {}, false);
        }
    }

    private static abstract class InMemoryWindowStoreIteratorWrapper {
        private Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator;
        private Iterator<Map.Entry<Bytes, byte[]>> recordIterator;
        private KeyValue<Bytes, byte[]> next;
        private long currentTime;
        private final boolean allKeys;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final boolean retainDuplicates;
        private final ClosingCallback callback;

        InMemoryWindowStoreIteratorWrapper(Bytes keyFrom, Bytes keyTo, Iterator<Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>>> segmentIterator, ClosingCallback callback, boolean retainDuplicates) {
            this.keyFrom = keyFrom;
            this.keyTo = keyTo;
            this.allKeys = keyFrom == null && keyTo == null;
            this.retainDuplicates = retainDuplicates;
            this.segmentIterator = segmentIterator;
            this.callback = callback;
            this.recordIterator = segmentIterator == null ? null : this.setRecordIterator();
        }

        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            if (this.recordIterator == null || !this.recordIterator.hasNext() && !this.segmentIterator.hasNext()) {
                return false;
            }
            this.next = this.getNext();
            if (this.next == null) {
                return false;
            }
            if (this.allKeys || !this.retainDuplicates) {
                return true;
            }
            Bytes key = InMemoryWindowStore.getKey((Bytes)this.next.key);
            if (key.compareTo(InMemoryWindowStore.getKey(this.keyFrom)) >= 0 && key.compareTo(InMemoryWindowStore.getKey(this.keyTo)) <= 0) {
                return true;
            }
            this.next = null;
            return this.hasNext();
        }

        public void close() {
            this.next = null;
            this.recordIterator = null;
            this.callback.deregisterIterator(this);
        }

        protected KeyValue<Bytes, byte[]> getNext() {
            while (!this.recordIterator.hasNext()) {
                this.recordIterator = this.setRecordIterator();
                if (this.recordIterator != null) continue;
                return null;
            }
            Map.Entry<Bytes, byte[]> nextRecord = this.recordIterator.next();
            return new KeyValue<Bytes, byte[]>(nextRecord.getKey(), nextRecord.getValue());
        }

        Iterator<Map.Entry<Bytes, byte[]>> setRecordIterator() {
            if (!this.segmentIterator.hasNext()) {
                return null;
            }
            Map.Entry<Long, ConcurrentNavigableMap<Bytes, byte[]>> currentSegment = this.segmentIterator.next();
            this.currentTime = currentSegment.getKey();
            if (this.allKeys) {
                return currentSegment.getValue().entrySet().iterator();
            }
            return currentSegment.getValue().subMap((Object)this.keyFrom, true, (Object)this.keyTo, true).entrySet().iterator();
        }

        Long minTime() {
            return this.currentTime;
        }
    }

    static interface ClosingCallback {
        public void deregisterIterator(InMemoryWindowStoreIteratorWrapper var1);
    }
}

