package org.apache.kafka.streams.state.internals;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingWindowStore.class */
public class CachingWindowStore extends WrappedStateStore<WindowStore<Bytes, byte[]>, byte[], byte[]> implements WindowStore<Bytes, byte[]>, CachedStateStore<byte[], byte[]> {
    private static final Logger LOG = LoggerFactory.getLogger(CachingWindowStore.class);
    private final long windowSize;
    private final SegmentedCacheFunction cacheFunction;
    private final SegmentedBytesStore.KeySchema keySchema;
    private String cacheName;
    private boolean sendOldValues;
    private InternalProcessorContext<?, ?> internalContext;
    private StateSerdes<Bytes, byte[]> bytesSerdes;
    private CacheFlushListener<byte[], byte[]> flushListener;
    private final AtomicLong maxObservedTimestamp;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingWindowStore$CacheIteratorWrapper.class */
    public class CacheIteratorWrapper implements PeekingKeyValueIterator<Bytes, LRUCacheEntry> {
        private final long segmentInterval;
        private final Bytes keyFrom;
        private final Bytes keyTo;
        private final long timeTo;
        private final boolean forward;
        private long lastSegmentId;
        private long currentSegmentId;
        private Bytes cacheKeyFrom;
        private Bytes cacheKeyTo;
        private ThreadCache.MemoryLRUCacheBytesIterator current;

        private CacheIteratorWrapper(CachingWindowStore cachingWindowStore, Bytes bytes, long j, long j2, boolean z) {
            this(bytes, bytes, j, j2, z);
        }

        private CacheIteratorWrapper(Bytes bytes, Bytes bytes2, long j, long j2, boolean z) {
            this.keyFrom = bytes;
            this.keyTo = bytes2;
            this.timeTo = j2;
            this.forward = z;
            this.segmentInterval = CachingWindowStore.this.cacheFunction.getSegmentInterval();
            if (z) {
                this.lastSegmentId = CachingWindowStore.this.cacheFunction.segmentId(Math.min(j2, CachingWindowStore.this.maxObservedTimestamp.get()));
                this.currentSegmentId = CachingWindowStore.this.cacheFunction.segmentId(j);
                setCacheKeyRange(j, currentSegmentLastTime());
                this.current = CachingWindowStore.this.internalContext.cache().range(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
                return;
            }
            this.currentSegmentId = CachingWindowStore.this.cacheFunction.segmentId(Math.min(j2, CachingWindowStore.this.maxObservedTimestamp.get()));
            this.lastSegmentId = CachingWindowStore.this.cacheFunction.segmentId(j);
            setCacheKeyRange(currentSegmentBeginTime(), Math.min(j2, CachingWindowStore.this.maxObservedTimestamp.get()));
            this.current = CachingWindowStore.this.internalContext.cache().reverseRange(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.current == null) {
                return false;
            }
            if (this.current.hasNext()) {
                return true;
            }
            while (!this.current.hasNext()) {
                getNextSegmentIterator();
                if (this.current == null) {
                    return false;
                }
            }
            return true;
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator
        public Bytes peekNextKey() {
            if (hasNext()) {
                return this.current.peekNextKey();
            }
            throw new NoSuchElementException();
        }

        @Override // org.apache.kafka.streams.state.internals.PeekingKeyValueIterator
        public KeyValue<Bytes, LRUCacheEntry> peekNext() {
            if (hasNext()) {
                return this.current.peekNext();
            }
            throw new NoSuchElementException();
        }

        @Override // java.util.Iterator
        public KeyValue<Bytes, LRUCacheEntry> next() {
            if (hasNext()) {
                return this.current.next();
            }
            throw new NoSuchElementException();
        }

        @Override // org.apache.kafka.streams.state.KeyValueIterator, java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.current.close();
        }

        private long currentSegmentBeginTime() {
            return this.currentSegmentId * this.segmentInterval;
        }

        private long currentSegmentLastTime() {
            return Math.min(this.timeTo, (currentSegmentBeginTime() + this.segmentInterval) - 1);
        }

        private void getNextSegmentIterator() {
            if (!this.forward) {
                this.currentSegmentId--;
                if (this.currentSegmentId < this.lastSegmentId) {
                    this.current = null;
                    return;
                }
                setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
                this.current.close();
                this.current = CachingWindowStore.this.internalContext.cache().reverseRange(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
                return;
            }
            this.currentSegmentId++;
            this.lastSegmentId = CachingWindowStore.this.cacheFunction.segmentId(Math.min(this.timeTo, CachingWindowStore.this.maxObservedTimestamp.get()));
            if (this.currentSegmentId > this.lastSegmentId) {
                this.current = null;
                return;
            }
            setCacheKeyRange(currentSegmentBeginTime(), currentSegmentLastTime());
            this.current.close();
            this.current = CachingWindowStore.this.internalContext.cache().range(CachingWindowStore.this.cacheName, this.cacheKeyFrom, this.cacheKeyTo);
        }

        private void setCacheKeyRange(long j, long j2) {
            if (CachingWindowStore.this.cacheFunction.segmentId(j) != CachingWindowStore.this.cacheFunction.segmentId(j2)) {
                throw new IllegalStateException("Error iterating over segments: segment interval has changed");
            }
            if (this.keyFrom == null || !this.keyFrom.equals(this.keyTo)) {
                this.cacheKeyFrom = this.keyFrom == null ? null : CachingWindowStore.this.cacheFunction.cacheKey(CachingWindowStore.this.keySchema.lowerRange(this.keyFrom, j), this.currentSegmentId);
                this.cacheKeyTo = this.keyTo == null ? null : CachingWindowStore.this.cacheFunction.cacheKey(CachingWindowStore.this.keySchema.upperRange(this.keyTo, this.timeTo), this.currentSegmentId);
            } else {
                this.cacheKeyFrom = CachingWindowStore.this.cacheFunction.cacheKey(segmentLowerRangeFixedSize(this.keyFrom, j));
                this.cacheKeyTo = CachingWindowStore.this.cacheFunction.cacheKey(segmentUpperRangeFixedSize(this.keyTo, j2));
            }
        }

        private Bytes segmentLowerRangeFixedSize(Bytes bytes, long j) {
            return WindowKeySchema.toStoreKeyBinary(bytes, Math.max(0L, j), 0);
        }

        private Bytes segmentUpperRangeFixedSize(Bytes bytes, long j) {
            return WindowKeySchema.toStoreKeyBinary(bytes, j, Integer.MAX_VALUE);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CachingWindowStore(WindowStore<Bytes, byte[]> windowStore, long j, long j2) {
        super(windowStore);
        this.keySchema = new WindowKeySchema();
        this.windowSize = j;
        this.cacheFunction = new SegmentedCacheFunction(this.keySchema, j2);
        this.maxObservedTimestamp = new AtomicLong(-1L);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
        String changelogFor = ProcessorContextUtils.changelogFor(stateStoreContext, name(), Boolean.TRUE);
        this.internalContext = ProcessorContextUtils.asInternalProcessorContext(stateStoreContext);
        this.bytesSerdes = new StateSerdes<>(changelogFor, Serdes.Bytes(), Serdes.ByteArray());
        this.cacheName = String.valueOf(this.internalContext.taskId()) + "-" + name();
        this.internalContext.registerCacheFlushListener(this.cacheName, list -> {
            Iterator it = list.iterator();
            while (it.hasNext()) {
                putAndMaybeForward((ThreadCache.DirtyEntry) it.next(), this.internalContext);
            }
        });
        super.init(stateStoreContext, stateStore);
    }

    private void putAndMaybeForward(ThreadCache.DirtyEntry dirtyEntry, InternalProcessorContext<?, ?> internalProcessorContext) {
        ProcessorRecordContext recordContext;
        byte[] bArr = this.cacheFunction.key(dirtyEntry.key()).get();
        Windowed<Bytes> fromStoreBytesKey = WindowKeySchema.fromStoreBytesKey(bArr, this.windowSize);
        long start = fromStoreBytesKey.window().start();
        Bytes key = fromStoreBytesKey.key();
        if (this.flushListener == null) {
            recordContext = internalProcessorContext.recordContext();
            try {
                internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
                wrapped().put(key, dirtyEntry.newValue(), start);
                internalProcessorContext.setRecordContext(recordContext);
                return;
            } finally {
            }
        }
        byte[] newValue = dirtyEntry.newValue();
        byte[] fetch = (newValue == null || this.sendOldValues) ? wrapped().fetch(key, start) : null;
        if (newValue == null && fetch == null) {
            return;
        }
        recordContext = internalProcessorContext.recordContext();
        try {
            internalProcessorContext.setRecordContext(dirtyEntry.entry().context());
            wrapped().put(key, dirtyEntry.newValue(), start);
            this.flushListener.apply(new Record<>(bArr, new Change(newValue, this.sendOldValues ? fetch : null), dirtyEntry.entry().context().timestamp(), dirtyEntry.entry().context().headers()));
            internalProcessorContext.setRecordContext(recordContext);
        } finally {
        }
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public boolean setFlushListener(CacheFlushListener<byte[], byte[]> cacheFlushListener, boolean z) {
        this.flushListener = cacheFlushListener;
        this.sendOldValues = z;
        return true;
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized void put(Bytes bytes, byte[] bArr, long j) {
        validateStoreOpen();
        Bytes storeKeyBinary = WindowKeySchema.toStoreKeyBinary(bytes, j, 0);
        this.internalContext.cache().put(this.cacheName, this.cacheFunction.cacheKey(storeKeyBinary), new LRUCacheEntry(bArr, this.internalContext.headers(), true, this.internalContext.offset(), this.internalContext.timestamp(), this.internalContext.partition(), this.internalContext.topic()));
        this.maxObservedTimestamp.set(Math.max(this.keySchema.segmentTimestamp(storeKeyBinary), this.maxObservedTimestamp.get()));
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public byte[] fetch(Bytes bytes, long j) {
        LRUCacheEntry lRUCacheEntry;
        validateStoreOpen();
        Bytes cacheKey = this.cacheFunction.cacheKey(WindowKeySchema.toStoreKeyBinary(bytes, j, 0));
        if (this.internalContext.cache() != null && (lRUCacheEntry = this.internalContext.cache().get(this.cacheName, cacheKey)) != null) {
            return lRUCacheEntry.value();
        }
        return wrapped().fetch(bytes, j);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized WindowStoreIterator<byte[]> fetch(Bytes bytes, long j, long j2) {
        validateStoreOpen();
        WindowStoreIterator<byte[]> fetch = wrapped().fetch((WindowStore<Bytes, byte[]>) bytes, j, j2);
        if (this.internalContext.cache() == null) {
            return fetch;
        }
        return new MergedSortedCacheWindowStoreIterator(new FilteredCacheIterator(wrapped().persistent() ? new CacheIteratorWrapper(this, bytes, j, j2, true) : this.internalContext.cache().range(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(bytes, j)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(bytes, j2))), this.keySchema.hasNextCondition(bytes, bytes, j, j2, true), this.cacheFunction), fetch, true);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public synchronized WindowStoreIterator<byte[]> backwardFetch(Bytes bytes, long j, long j2) {
        validateStoreOpen();
        WindowStoreIterator<byte[]> backwardFetch = wrapped().backwardFetch((WindowStore<Bytes, byte[]>) bytes, j, j2);
        if (this.internalContext.cache() == null) {
            return backwardFetch;
        }
        return new MergedSortedCacheWindowStoreIterator(new FilteredCacheIterator(wrapped().persistent() ? new CacheIteratorWrapper(this, bytes, j, j2, false) : this.internalContext.cache().reverseRange(this.cacheName, this.cacheFunction.cacheKey(this.keySchema.lowerRangeFixedSize(bytes, j)), this.cacheFunction.cacheKey(this.keySchema.upperRangeFixedSize(bytes, j2))), this.keySchema.hasNextCondition(bytes, bytes, j, j2, false), this.cacheFunction), backwardFetch, false);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        PeekingKeyValueIterator range;
        if (bytes != null && bytes2 != null && bytes.compareTo(bytes2) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> fetch = wrapped().fetch(bytes, bytes2, j, j2);
        if (this.internalContext.cache() == null) {
            return fetch;
        }
        if (wrapped().persistent()) {
            range = new CacheIteratorWrapper(bytes, bytes2, j, j2, true);
        } else {
            range = this.internalContext.cache().range(this.cacheName, bytes == null ? null : this.cacheFunction.cacheKey(this.keySchema.lowerRange(bytes, j)), bytes2 == null ? null : this.cacheFunction.cacheKey(this.keySchema.upperRange(bytes2, j2)));
        }
        return new MergedSortedCacheWindowStoreKeyValueIterator(new FilteredCacheIterator(range, this.keySchema.hasNextCondition(bytes, bytes2, j, j2, true), this.cacheFunction), fetch, this.bytesSerdes, this.windowSize, this.cacheFunction, true);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch(Bytes bytes, Bytes bytes2, long j, long j2) {
        PeekingKeyValueIterator reverseRange;
        if (bytes != null && bytes2 != null && bytes.compareTo(bytes2) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        validateStoreOpen();
        KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetch = wrapped().backwardFetch(bytes, bytes2, j, j2);
        if (this.internalContext.cache() == null) {
            return backwardFetch;
        }
        if (wrapped().persistent()) {
            reverseRange = new CacheIteratorWrapper(bytes, bytes2, j, j2, false);
        } else {
            reverseRange = this.internalContext.cache().reverseRange(this.cacheName, bytes == null ? null : this.cacheFunction.cacheKey(this.keySchema.lowerRange(bytes, j)), bytes2 == null ? null : this.cacheFunction.cacheKey(this.keySchema.upperRange(bytes2, j2)));
        }
        return new MergedSortedCacheWindowStoreKeyValueIterator(new FilteredCacheIterator(reverseRange, this.keySchema.hasNextCondition(bytes, bytes2, j, j2, false), this.cacheFunction), backwardFetch, this.bytesSerdes, this.windowSize, this.cacheFunction, false);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> fetchAll(long j, long j2) {
        validateStoreOpen();
        return new MergedSortedCacheWindowStoreKeyValueIterator(new FilteredCacheIterator(this.internalContext.cache().all(this.cacheName), this.keySchema.hasNextCondition(null, null, j, j2, true), this.cacheFunction), wrapped().fetchAll(j, j2), this.bytesSerdes, this.windowSize, this.cacheFunction, true);
    }

    @Override // org.apache.kafka.streams.state.WindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardFetchAll(long j, long j2) {
        validateStoreOpen();
        return new MergedSortedCacheWindowStoreKeyValueIterator(new FilteredCacheIterator(this.internalContext.cache().reverseAll(this.cacheName), this.keySchema.hasNextCondition(null, null, j, j2, false), this.cacheFunction), wrapped().backwardFetchAll(j, j2), this.bytesSerdes, this.windowSize, this.cacheFunction, false);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> all() {
        validateStoreOpen();
        return new MergedSortedCacheWindowStoreKeyValueIterator(this.internalContext.cache().all(this.cacheName), wrapped().all(), this.bytesSerdes, this.windowSize, this.cacheFunction, true);
    }

    @Override // org.apache.kafka.streams.state.ReadOnlyWindowStore
    public KeyValueIterator<Windowed<Bytes>, byte[]> backwardAll() {
        validateStoreOpen();
        return new MergedSortedCacheWindowStoreKeyValueIterator(this.internalContext.cache().reverseAll(this.cacheName), wrapped().backwardAll(), this.bytesSerdes, this.windowSize, this.cacheFunction, false);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public synchronized void flush() {
        this.internalContext.cache().flush(this.cacheName);
        wrapped().flush();
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public void flushCache() {
        this.internalContext.cache().flush(this.cacheName);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.state.internals.CachedStateStore
    public void clearCache() {
        this.internalContext.cache().clear(this.cacheName);
    }

    @Override // org.apache.kafka.streams.state.internals.WrappedStateStore, org.apache.kafka.streams.processor.StateStore
    public synchronized void close() {
        WindowStore<Bytes, byte[]> wrapped = wrapped();
        Objects.requireNonNull(wrapped);
        LinkedList<RuntimeException> executeAll = ExceptionUtils.executeAll(() -> {
            this.internalContext.cache().flush(this.cacheName);
        }, () -> {
            this.internalContext.cache().close(this.cacheName);
        }, wrapped::close);
        if (executeAll.isEmpty()) {
            return;
        }
        ExceptionUtils.throwSuppressed("Caught an exception while closing caching window store for store " + name(), executeAll);
    }
}
