/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.LRUCacheEntry;
import org.apache.kafka.streams.state.internals.NamedCache;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Test;

public class ThreadCacheTest {
    final String namespace = "0.0-namespace";
    final String namespace1 = "0.1-namespace";
    final String namespace2 = "0.2-namespace";
    private final LogContext logContext = new LogContext("testCache ");
    private final byte[][] bytes = new byte[][]{{0}, {1}, {2}, {3}, {4}, {5}, {6}, {7}, {8}, {9}, {10}};

    @Test
    public void basicPutGet() {
        Bytes key;
        List<KeyValue> toInsert = Arrays.asList(new KeyValue((Object)"K1", (Object)"V1"), new KeyValue((Object)"K2", (Object)"V2"), new KeyValue((Object)"K3", (Object)"V3"), new KeyValue((Object)"K4", (Object)"V4"), new KeyValue((Object)"K5", (Object)"V5"));
        KeyValue kv = toInsert.get(0);
        ThreadCache cache = new ThreadCache(this.logContext, (long)toInsert.size() * ThreadCacheTest.memoryCacheEntrySize(((String)kv.key).getBytes(), ((String)kv.value).getBytes(), ""), (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        for (KeyValue kvToInsert : toInsert) {
            key = Bytes.wrap((byte[])((String)kvToInsert.key).getBytes());
            byte[] value = ((String)kvToInsert.value).getBytes();
            cache.put("0.0-namespace", key, new LRUCacheEntry(value, (Headers)new RecordHeaders(), true, 1L, 1L, 1, ""));
        }
        for (KeyValue kvToInsert : toInsert) {
            key = Bytes.wrap((byte[])((String)kvToInsert.key).getBytes());
            LRUCacheEntry entry = cache.get("0.0-namespace", key);
            Assert.assertTrue((boolean)entry.isDirty());
            Assert.assertEquals((Object)new String(entry.value()), (Object)kvToInsert.value);
        }
        Assert.assertEquals((long)cache.gets(), (long)5L);
        Assert.assertEquals((long)cache.puts(), (long)5L);
        Assert.assertEquals((long)cache.evicts(), (long)0L);
        Assert.assertEquals((long)cache.flushes(), (long)0L);
    }

    private void checkOverheads(double entryFactor, double systemFactor, long desiredCacheSize, int keySizeBytes, int valueSizeBytes) {
        Runtime runtime = Runtime.getRuntime();
        long numElements = desiredCacheSize / ThreadCacheTest.memoryCacheEntrySize(new byte[keySizeBytes], new byte[valueSizeBytes], "");
        System.gc();
        long prevRuntimeMemory = runtime.totalMemory() - runtime.freeMemory();
        ThreadCache cache = new ThreadCache(this.logContext, desiredCacheSize, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        long size = cache.sizeBytes();
        Assert.assertEquals((long)size, (long)0L);
        int i = 0;
        while ((long)i < numElements) {
            String keyStr = "K" + i;
            Bytes key = Bytes.wrap((byte[])keyStr.getBytes());
            byte[] value = new byte[valueSizeBytes];
            cache.put("0.0-namespace", key, new LRUCacheEntry(value, (Headers)new RecordHeaders(), true, 1L, 1L, 1, ""));
            ++i;
        }
        System.gc();
        double ceiling = (double)desiredCacheSize + (double)desiredCacheSize * entryFactor;
        long usedRuntimeMemory = runtime.totalMemory() - runtime.freeMemory() - prevRuntimeMemory;
        Assert.assertTrue(((double)cache.sizeBytes() <= ceiling ? 1 : 0) != 0);
        Assert.assertTrue((String)("Used memory size " + usedRuntimeMemory + " greater than expected " + (double)cache.sizeBytes() * systemFactor), ((double)cache.sizeBytes() * systemFactor >= (double)usedRuntimeMemory ? 1 : 0) != 0);
    }

    @Test
    public void cacheOverheadsSmallValues() {
        Runtime runtime = Runtime.getRuntime();
        double factor = 0.05;
        double systemFactor = 3.0;
        long desiredCacheSize = Math.min(0x6400000L, runtime.maxMemory());
        int keySizeBytes = 8;
        int valueSizeBytes = 100;
        this.checkOverheads(0.05, 3.0, desiredCacheSize, 8, 100);
    }

    @Test
    public void cacheOverheadsLargeValues() {
        Runtime runtime = Runtime.getRuntime();
        double factor = 0.05;
        double systemFactor = 2.0;
        long desiredCacheSize = Math.min(0x6400000L, runtime.maxMemory());
        int keySizeBytes = 8;
        int valueSizeBytes = 1000;
        this.checkOverheads(0.05, 2.0, desiredCacheSize, 8, 1000);
    }

    static long memoryCacheEntrySize(byte[] key, byte[] value, String topic) {
        return key.length + value.length + 1 + 8 + 8 + 4 + topic.length() + key.length + 8 + 8 + 8;
    }

    @Test
    public void evict() {
        ArrayList received = new ArrayList();
        List<KeyValue> expected = Collections.singletonList(new KeyValue((Object)"K1", (Object)"V1"));
        List<KeyValue> toInsert = Arrays.asList(new KeyValue((Object)"K1", (Object)"V1"), new KeyValue((Object)"K2", (Object)"V2"), new KeyValue((Object)"K3", (Object)"V3"), new KeyValue((Object)"K4", (Object)"V4"), new KeyValue((Object)"K5", (Object)"V5"));
        KeyValue kv = toInsert.get(0);
        ThreadCache cache = new ThreadCache(this.logContext, ThreadCacheTest.memoryCacheEntrySize(((String)kv.key).getBytes(), ((String)kv.value).getBytes(), ""), (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("0.0-namespace", dirty -> {
            for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
                received.add(new KeyValue((Object)dirtyEntry.key().toString(), (Object)new String(dirtyEntry.newValue())));
            }
        });
        for (KeyValue kvToInsert : toInsert) {
            Bytes key = Bytes.wrap((byte[])((String)kvToInsert.key).getBytes());
            byte[] value = ((String)kvToInsert.value).getBytes();
            cache.put("0.0-namespace", key, new LRUCacheEntry(value, (Headers)new RecordHeaders(), true, 1L, 1L, 1, ""));
        }
        for (int i = 0; i < expected.size(); ++i) {
            KeyValue expectedRecord = expected.get(i);
            KeyValue actualRecord = (KeyValue)received.get(i);
            Assert.assertEquals((Object)expectedRecord, (Object)actualRecord);
        }
        Assert.assertEquals((long)cache.evicts(), (long)4L);
    }

    @Test
    public void shouldDelete() {
        ThreadCache cache = new ThreadCache(this.logContext, 10000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        Bytes key = Bytes.wrap((byte[])new byte[]{0});
        cache.put("0.0-namespace", key, this.dirtyEntry(key.get()));
        Assert.assertArrayEquals((byte[])key.get(), (byte[])cache.delete("0.0-namespace", key).value());
        Assert.assertNull((Object)cache.get("0.0-namespace", key));
    }

    @Test
    public void shouldNotFlushAfterDelete() {
        Bytes key = Bytes.wrap((byte[])new byte[]{0});
        ThreadCache cache = new ThreadCache(this.logContext, 10000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("0.0-namespace", received::addAll);
        cache.put("0.0-namespace", key, this.dirtyEntry(key.get()));
        Assert.assertArrayEquals((byte[])key.get(), (byte[])cache.delete("0.0-namespace", key).value());
        cache.flush("0.0-namespace");
        Assert.assertEquals((long)0L, (long)received.size());
        Assert.assertEquals((long)cache.flushes(), (long)1L);
    }

    @Test
    public void shouldNotBlowUpOnNonExistentKeyWhenDeleting() {
        Bytes key = Bytes.wrap((byte[])new byte[]{0});
        ThreadCache cache = new ThreadCache(this.logContext, 10000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.put("0.0-namespace", key, this.dirtyEntry(key.get()));
        Assert.assertNull((Object)cache.delete("0.0-namespace", Bytes.wrap((byte[])new byte[]{1})));
    }

    @Test
    public void shouldNotBlowUpOnNonExistentNamespaceWhenDeleting() {
        ThreadCache cache = new ThreadCache(this.logContext, 10000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        Assert.assertNull((Object)cache.delete("0.0-namespace", Bytes.wrap((byte[])new byte[]{1})));
    }

    @Test
    public void shouldNotClashWithOverlappingNames() {
        ThreadCache cache = new ThreadCache(this.logContext, 10000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        Bytes nameByte = Bytes.wrap((byte[])new byte[]{0});
        Bytes name1Byte = Bytes.wrap((byte[])new byte[]{1});
        cache.put("0.1-namespace", nameByte, this.dirtyEntry(nameByte.get()));
        cache.put("0.2-namespace", nameByte, this.dirtyEntry(name1Byte.get()));
        Assert.assertArrayEquals((byte[])nameByte.get(), (byte[])cache.get("0.1-namespace", nameByte).value());
        Assert.assertArrayEquals((byte[])name1Byte.get(), (byte[])cache.get("0.2-namespace", nameByte).value());
    }

    private ThreadCache setupThreadCache(int first, int last, long entrySize, boolean reverse) {
        ThreadCache cache = new ThreadCache(this.logContext, entrySize, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("0.0-namespace", dirty -> {});
        int index = first;
        while (!reverse && index < last || reverse && index >= last) {
            cache.put("0.0-namespace", Bytes.wrap((byte[])this.bytes[index]), this.dirtyEntry(this.bytes[index]));
            if (!reverse) {
                ++index;
                continue;
            }
            --index;
        }
        return cache;
    }

    @Test
    public void shouldPeekNextKey() {
        ThreadCache cache = this.setupThreadCache(0, 1, 10000L, false);
        Bytes theByte = Bytes.wrap((byte[])new byte[]{0});
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("0.0-namespace", theByte, Bytes.wrap((byte[])new byte[]{1}));
        Assert.assertEquals((Object)theByte, (Object)iterator.peekNextKey());
        Assert.assertEquals((Object)theByte, (Object)iterator.peekNextKey());
    }

    @Test
    public void shouldPeekNextKeyReverseRange() {
        ThreadCache cache = this.setupThreadCache(1, 1, 10000L, true);
        Bytes theByte = Bytes.wrap((byte[])new byte[]{1});
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), theByte);
        MatcherAssert.assertThat((Object)iterator.peekNextKey(), (Matcher)Is.is((Object)theByte));
        MatcherAssert.assertThat((Object)iterator.peekNextKey(), (Matcher)Is.is((Object)theByte));
    }

    @Test
    public void shouldGetSameKeyAsPeekNext() {
        ThreadCache cache = this.setupThreadCache(0, 1, 10000L, false);
        Bytes theByte = Bytes.wrap((byte[])new byte[]{0});
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("0.0-namespace", theByte, Bytes.wrap((byte[])new byte[]{1}));
        MatcherAssert.assertThat((Object)iterator.peekNextKey(), (Matcher)Is.is((Object)iterator.next().key));
    }

    @Test
    public void shouldGetSameKeyAsPeekNextReverseRange() {
        ThreadCache cache = this.setupThreadCache(1, 1, 10000L, true);
        Bytes theByte = Bytes.wrap((byte[])new byte[]{1});
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), theByte);
        MatcherAssert.assertThat((Object)iterator.peekNextKey(), (Matcher)Is.is((Object)iterator.next().key));
    }

    private void shouldThrowIfNoPeekNextKey(Supplier<ThreadCache.MemoryLRUCacheBytesIterator> methodUnderTest) {
        ThreadCache.MemoryLRUCacheBytesIterator iterator = methodUnderTest.get();
        Assert.assertThrows(NoSuchElementException.class, () -> ((ThreadCache.MemoryLRUCacheBytesIterator)iterator).peekNextKey());
    }

    @Test
    public void shouldThrowIfNoPeekNextKeyRange() {
        ThreadCache cache = this.setupThreadCache(0, 0, 10000L, false);
        this.shouldThrowIfNoPeekNextKey(() -> cache.range("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), Bytes.wrap((byte[])new byte[]{1})));
    }

    @Test
    public void shouldThrowIfNoPeekNextKeyReverseRange() {
        ThreadCache cache = this.setupThreadCache(-1, 0, 10000L, true);
        this.shouldThrowIfNoPeekNextKey(() -> cache.reverseRange("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), Bytes.wrap((byte[])new byte[]{1})));
    }

    @Test
    public void shouldReturnFalseIfNoNextKey() {
        ThreadCache cache = this.setupThreadCache(0, 0, 10000L, false);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), Bytes.wrap((byte[])new byte[]{1}));
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldReturnFalseIfNoNextKeyReverseRange() {
        ThreadCache cache = this.setupThreadCache(-1, 0, 10000L, true);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), Bytes.wrap((byte[])new byte[]{1}));
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldPeekAndIterateOverRange() {
        ThreadCache cache = this.setupThreadCache(0, 10, 10000L, false);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), Bytes.wrap((byte[])new byte[]{4}));
        int bytesIndex = 1;
        while (iterator.hasNext()) {
            Bytes peekedKey = iterator.peekNextKey();
            KeyValue next = iterator.next();
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])peekedKey.get());
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])((Bytes)next.key).get());
            ++bytesIndex;
        }
        Assert.assertEquals((long)5L, (long)bytesIndex);
    }

    @Test
    public void shouldSkipToEntryWhenToInclusiveIsFalseInRange() {
        ThreadCache cache = this.setupThreadCache(0, 10, 10000L, false);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.range("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), Bytes.wrap((byte[])new byte[]{4}), false);
        int bytesIndex = 1;
        while (iterator.hasNext()) {
            Bytes peekedKey = iterator.peekNextKey();
            KeyValue next = iterator.next();
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])peekedKey.get());
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])((Bytes)next.key).get());
            ++bytesIndex;
        }
        Assert.assertEquals((long)4L, (long)bytesIndex);
    }

    @Test
    public void shouldPeekAndIterateOverReverseRange() {
        ThreadCache cache = this.setupThreadCache(10, 0, 10000L, true);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseRange("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), Bytes.wrap((byte[])new byte[]{4}));
        int bytesIndex = 4;
        while (iterator.hasNext()) {
            Bytes peekedKey = iterator.peekNextKey();
            KeyValue next = iterator.next();
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])peekedKey.get());
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])((Bytes)next.key).get());
            --bytesIndex;
        }
        Assert.assertEquals((long)0L, (long)bytesIndex);
    }

    @Test
    public void shouldSkipEntriesWhereValueHasBeenEvictedFromCache() {
        long entrySize = ThreadCacheTest.memoryCacheEntrySize(new byte[1], new byte[1], "");
        ThreadCache cache = this.setupThreadCache(0, 5, entrySize * 5L, false);
        Assert.assertEquals((long)5L, (long)cache.size());
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{6}), this.dirtyEntry(new byte[]{6}));
        ThreadCache.MemoryLRUCacheBytesIterator range = cache.range("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), Bytes.wrap((byte[])new byte[]{5}));
        Assert.assertEquals((Object)Bytes.wrap((byte[])new byte[]{1}), (Object)range.peekNextKey());
    }

    @Test
    public void shouldSkipEntriesWhereValueHasBeenEvictedFromCacheReverseRange() {
        long entrySize = ThreadCacheTest.memoryCacheEntrySize(new byte[1], new byte[1], "");
        ThreadCache cache = this.setupThreadCache(4, 0, entrySize * 5L, true);
        Assert.assertEquals((long)5L, (long)cache.size());
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{6}), this.dirtyEntry(new byte[]{6}));
        ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseRange("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), Bytes.wrap((byte[])new byte[]{5}));
        Assert.assertEquals((Object)Bytes.wrap((byte[])new byte[]{3}), (Object)range.peekNextKey());
    }

    @Test
    public void shouldFetchAllEntriesInCache() {
        ThreadCache cache = this.setupThreadCache(0, 11, 10000L, false);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.all("0.0-namespace");
        int bytesIndex = 0;
        while (iterator.hasNext()) {
            Bytes peekedKey = iterator.peekNextKey();
            KeyValue next = iterator.next();
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])peekedKey.get());
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])((Bytes)next.key).get());
            ++bytesIndex;
        }
        Assert.assertEquals((long)11L, (long)bytesIndex);
    }

    @Test
    public void shouldFetchAllEntriesInCacheInReverseOrder() {
        ThreadCache cache = this.setupThreadCache(10, 0, 10000L, true);
        ThreadCache.MemoryLRUCacheBytesIterator iterator = cache.reverseAll("0.0-namespace");
        int bytesIndex = 10;
        while (iterator.hasNext()) {
            Bytes peekedKey = iterator.peekNextKey();
            KeyValue next = iterator.next();
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])peekedKey.get());
            Assert.assertArrayEquals((byte[])this.bytes[bytesIndex], (byte[])((Bytes)next.key).get());
            --bytesIndex;
        }
        Assert.assertEquals((long)-1L, (long)bytesIndex);
    }

    @Test
    public void shouldReturnAllUnevictedValuesFromCache() {
        long entrySize = ThreadCacheTest.memoryCacheEntrySize(new byte[1], new byte[1], "");
        ThreadCache cache = this.setupThreadCache(0, 5, entrySize * 5L, false);
        Assert.assertEquals((long)5L, (long)cache.size());
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{6}), this.dirtyEntry(new byte[]{6}));
        ThreadCache.MemoryLRUCacheBytesIterator range = cache.all("0.0-namespace");
        Assert.assertEquals((Object)Bytes.wrap((byte[])new byte[]{1}), (Object)range.peekNextKey());
    }

    @Test
    public void shouldReturnAllUnevictedValuesFromCacheInReverseOrder() {
        long entrySize = ThreadCacheTest.memoryCacheEntrySize(new byte[1], new byte[1], "");
        ThreadCache cache = this.setupThreadCache(4, 0, entrySize * 5L, true);
        Assert.assertEquals((long)5L, (long)cache.size());
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{6}), this.dirtyEntry(new byte[]{6}));
        ThreadCache.MemoryLRUCacheBytesIterator range = cache.reverseAll("0.0-namespace");
        Assert.assertEquals((Object)Bytes.wrap((byte[])new byte[]{6}), (Object)range.peekNextKey());
    }

    @Test
    public void shouldFlushDirtyEntriesForNamespace() {
        ThreadCache cache = new ThreadCache(this.logContext, 100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("0.1-namespace", dirty -> {
            for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
                received.add(dirtyEntry.key().get());
            }
        });
        List<byte[]> expected = Arrays.asList({0}, {1}, {2});
        for (byte[] bytes : expected) {
            cache.put("0.1-namespace", Bytes.wrap((byte[])bytes), this.dirtyEntry(bytes));
        }
        cache.put("0.2-namespace", Bytes.wrap((byte[])new byte[]{4}), this.dirtyEntry(new byte[]{4}));
        cache.flush("0.1-namespace");
        Assert.assertEquals(expected, received);
    }

    @Test
    public void shouldNotFlushCleanEntriesForNamespace() {
        ThreadCache cache = new ThreadCache(this.logContext, 100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("0.1-namespace", dirty -> {
            for (ThreadCache.DirtyEntry dirtyEntry : dirty) {
                received.add(dirtyEntry.key().get());
            }
        });
        List<byte[]> toInsert = Arrays.asList({0}, {1}, {2});
        for (byte[] bytes : toInsert) {
            cache.put("0.1-namespace", Bytes.wrap((byte[])bytes), this.cleanEntry(bytes));
        }
        cache.put("0.2-namespace", Bytes.wrap((byte[])new byte[]{4}), this.cleanEntry(new byte[]{4}));
        cache.flush("0.1-namespace");
        Assert.assertEquals(Collections.emptyList(), received);
    }

    private void shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(ThreadCache cache) {
        ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("0.0-namespace", received::addAll);
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), this.dirtyEntry(new byte[]{0}));
        Assert.assertEquals((long)1L, (long)received.size());
        cache.flush("0.0-namespace");
        Assert.assertEquals((long)1L, (long)received.size());
    }

    @Test
    public void shouldEvictImmediatelyIfCacheSizeIsVerySmall() {
        ThreadCache cache = new ThreadCache(this.logContext, 1L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
    }

    @Test
    public void shouldEvictImmediatelyIfCacheSizeIsZero() {
        ThreadCache cache = new ThreadCache(this.logContext, 0L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        this.shouldEvictImmediatelyIfCacheSizeIsZeroOrVerySmall(cache);
    }

    @Test
    public void shouldEvictAfterPutAll() {
        ArrayList received = new ArrayList();
        ThreadCache cache = new ThreadCache(this.logContext, 1L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("0.0-namespace", received::addAll);
        cache.putAll("0.0-namespace", Arrays.asList(KeyValue.pair((Object)Bytes.wrap((byte[])new byte[]{0}), (Object)this.dirtyEntry(new byte[]{5})), KeyValue.pair((Object)Bytes.wrap((byte[])new byte[]{1}), (Object)this.dirtyEntry(new byte[]{6}))));
        Assert.assertEquals((long)cache.evicts(), (long)2L);
        Assert.assertEquals((long)received.size(), (long)2L);
    }

    @Test
    public void shouldPutAll() {
        ThreadCache cache = new ThreadCache(this.logContext, 100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.putAll("0.0-namespace", Arrays.asList(KeyValue.pair((Object)Bytes.wrap((byte[])new byte[]{0}), (Object)this.dirtyEntry(new byte[]{5})), KeyValue.pair((Object)Bytes.wrap((byte[])new byte[]{1}), (Object)this.dirtyEntry(new byte[]{6}))));
        Assert.assertArrayEquals((byte[])new byte[]{5}, (byte[])cache.get("0.0-namespace", Bytes.wrap((byte[])new byte[]{0})).value());
        Assert.assertArrayEquals((byte[])new byte[]{6}, (byte[])cache.get("0.0-namespace", Bytes.wrap((byte[])new byte[]{1})).value());
    }

    @Test
    public void shouldNotForwardCleanEntryOnEviction() {
        ThreadCache cache = new ThreadCache(this.logContext, 0L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        ArrayList received = new ArrayList();
        cache.addDirtyEntryFlushListener("0.0-namespace", received::addAll);
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), this.cleanEntry(new byte[]{0}));
        Assert.assertEquals((long)0L, (long)received.size());
    }

    @Test
    public void shouldPutIfAbsent() {
        ThreadCache cache = new ThreadCache(this.logContext, 100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        Bytes key = Bytes.wrap((byte[])new byte[]{10});
        byte[] value = new byte[]{30};
        Assert.assertNull((Object)cache.putIfAbsent("0.0-namespace", key, this.dirtyEntry(value)));
        Assert.assertArrayEquals((byte[])value, (byte[])cache.putIfAbsent("0.0-namespace", key, this.dirtyEntry(new byte[]{8})).value());
        Assert.assertArrayEquals((byte[])value, (byte[])cache.get("0.0-namespace", key).value());
    }

    @Test
    public void shouldEvictAfterPutIfAbsent() {
        ArrayList received = new ArrayList();
        ThreadCache cache = new ThreadCache(this.logContext, 1L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.addDirtyEntryFlushListener("0.0-namespace", received::addAll);
        cache.putIfAbsent("0.0-namespace", Bytes.wrap((byte[])new byte[]{0}), this.dirtyEntry(new byte[]{5}));
        cache.putIfAbsent("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), this.dirtyEntry(new byte[]{6}));
        cache.putIfAbsent("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), this.dirtyEntry(new byte[]{6}));
        Assert.assertEquals((long)cache.evicts(), (long)3L);
        Assert.assertEquals((long)received.size(), (long)3L);
    }

    @Test
    public void shouldNotLoopForEverWhenEvictingAndCurrentCacheIsEmpty() {
        int maxCacheSizeInBytes = 100;
        ThreadCache threadCache = new ThreadCache(this.logContext, 100L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        threadCache.addDirtyEntryFlushListener("0.0-namespace", dirty -> threadCache.put("0.1-namespace", Bytes.wrap((byte[])new byte[]{0}), this.dirtyEntry(new byte[2])));
        threadCache.addDirtyEntryFlushListener("0.1-namespace", dirty -> {});
        threadCache.addDirtyEntryFlushListener("0.2-namespace", dirty -> {});
        threadCache.put("0.2-namespace", Bytes.wrap((byte[])new byte[]{1}), this.dirtyEntry(new byte[1]));
        threadCache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), this.dirtyEntry(new byte[1]));
        int remaining = (int)(100L - threadCache.sizeBytes());
        threadCache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{2}), this.dirtyEntry(new byte[remaining + 100]));
    }

    @Test
    public void shouldCleanupNamedCacheOnClose() {
        ThreadCache cache = new ThreadCache(this.logContext, 100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.put("0.1-namespace", Bytes.wrap((byte[])new byte[]{1}), this.cleanEntry(new byte[]{1}));
        cache.put("0.2-namespace", Bytes.wrap((byte[])new byte[]{1}), this.cleanEntry(new byte[]{1}));
        Assert.assertEquals((long)cache.size(), (long)2L);
        cache.close("0.2-namespace");
        Assert.assertEquals((long)cache.size(), (long)1L);
        Assert.assertNull((Object)cache.get("0.2-namespace", Bytes.wrap((byte[])new byte[]{1})));
    }

    @Test
    public void shouldReturnNullIfKeyIsNull() {
        ThreadCache threadCache = new ThreadCache(this.logContext, 10L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        threadCache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), this.cleanEntry(new byte[]{1}));
        Assert.assertNull((Object)threadCache.get("0.0-namespace", null));
    }

    @Test
    public void shouldCalculateSizeInBytes() {
        ThreadCache cache = new ThreadCache(this.logContext, 100000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        NamedCache.LRUNode node = new NamedCache.LRUNode(Bytes.wrap((byte[])new byte[]{1}), this.dirtyEntry(new byte[]{0}));
        cache.put("0.1-namespace", Bytes.wrap((byte[])new byte[]{1}), this.cleanEntry(new byte[]{0}));
        Assert.assertEquals((long)cache.sizeBytes(), (long)node.size());
    }

    @Test
    public void shouldResizeAndShrink() {
        ThreadCache cache = new ThreadCache(this.logContext, 10000L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics()));
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{1}), this.cleanEntry(new byte[]{0}));
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{2}), this.cleanEntry(new byte[]{0}));
        cache.put("0.0-namespace", Bytes.wrap((byte[])new byte[]{3}), this.cleanEntry(new byte[]{0}));
        Assert.assertEquals((long)141L, (long)cache.sizeBytes());
        cache.resize(100L);
        Assert.assertEquals((long)94L, (long)cache.sizeBytes());
        cache.put("0.1-namespace", Bytes.wrap((byte[])new byte[]{4}), this.cleanEntry(new byte[]{0}));
        Assert.assertEquals((long)94L, (long)cache.sizeBytes());
    }

    private LRUCacheEntry dirtyEntry(byte[] key) {
        return new LRUCacheEntry(key, (Headers)new RecordHeaders(), true, -1L, -1L, -1, "");
    }

    private LRUCacheEntry cleanEntry(byte[] key) {
        return new LRUCacheEntry(key);
    }
}

