package org.apache.kafka.streams.state.internals;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.class */
public class CachingPersistentWindowStoreTest {
    private static final int MAX_CACHE_SIZE_BYTES = 150;
    private static final long DEFAULT_TIMESTAMP = 10;
    private static final Long WINDOW_SIZE = Long.valueOf(DEFAULT_TIMESTAMP);
    private static final long SEGMENT_INTERVAL = 100;
    private static final String TOPIC = "topic";
    private static final String CACHE_NAMESPACE = "0_0-store-name";
    private InternalMockProcessorContext context;
    private RocksDBSegmentedBytesStore bytesStore;
    private WindowStore<Bytes, byte[]> underlyingStore;
    private CachingWindowStore cachingStore;
    private CacheFlushListenerStub<Windowed<String>, String> cacheListener;
    private ThreadCache cache;
    private WindowKeySchema keySchema;

    @Before
    public void setUp() {
        this.keySchema = new WindowKeySchema();
        this.bytesStore = new RocksDBSegmentedBytesStore("test", "metrics-scope", 0L, SEGMENT_INTERVAL, this.keySchema);
        this.underlyingStore = new RocksDBWindowStore(this.bytesStore, false, WINDOW_SIZE.longValue());
        TimeWindowedDeserializer timeWindowedDeserializer = new TimeWindowedDeserializer(new StringDeserializer(), WINDOW_SIZE);
        timeWindowedDeserializer.setIsChangelogTopic(true);
        this.cacheListener = new CacheFlushListenerStub<>(timeWindowedDeserializer, new StringDeserializer());
        this.cachingStore = new CachingWindowStore(this.underlyingStore, WINDOW_SIZE.longValue(), SEGMENT_INTERVAL);
        this.cachingStore.setFlushListener(this.cacheListener, false);
        this.cache = new ThreadCache(new LogContext("testCache "), 150L, new MockStreamsMetrics(new Metrics()));
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0L, 0, TOPIC, new RecordHeaders()));
        this.cachingStore.init(this.context, this.cachingStore);
    }

    @After
    public void closeStore() {
        this.cachingStore.close();
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        WindowStore windowStore = (WindowStore) EasyMock.mock(WindowStore.class);
        CachingWindowStore cachingWindowStore = new CachingWindowStore(windowStore, WINDOW_SIZE.longValue(), SEGMENT_INTERVAL);
        EasyMock.expect(windowStore.name()).andStubReturn("store");
        windowStore.init(this.context, cachingWindowStore);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{windowStore});
        cachingWindowStore.init(this.context, cachingWindowStore);
        EasyMock.verify(new Object[]{windowStore});
    }

    @Test
    public void shouldDelegateInit() {
        WindowStore windowStore = (WindowStore) EasyMock.mock(WindowStore.class);
        CachingWindowStore cachingWindowStore = new CachingWindowStore(windowStore, WINDOW_SIZE.longValue(), SEGMENT_INTERVAL);
        EasyMock.expect(windowStore.name()).andStubReturn("store");
        windowStore.init(this.context, cachingWindowStore);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{windowStore});
        cachingWindowStore.init(this.context, cachingWindowStore);
        EasyMock.verify(new Object[]{windowStore});
    }

    @Test
    public void shouldNotReturnDuplicatesInRanges() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore("store-name", Duration.ofHours(1L), Duration.ofMinutes(1L), false), Serdes.String(), Serdes.String()).withCachingEnabled());
        streamsBuilder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String())).transform(() -> {
            return new Transformer<String, String, KeyValue<String, String>>() { // from class: org.apache.kafka.streams.state.internals.CachingPersistentWindowStoreTest.1
                private int numRecordsProcessed;
                private ProcessorContext context;
                private WindowStore store;

                public void init(ProcessorContext processorContext) {
                    this.context = processorContext;
                    this.store = processorContext.getStateStore("store-name");
                    int i = 0;
                    KeyValueIterator all = this.store.all();
                    Throwable th = null;
                    while (all.hasNext()) {
                        try {
                            try {
                                i++;
                                all.next();
                            } finally {
                            }
                        } catch (Throwable th2) {
                            if (all != null) {
                                if (th != null) {
                                    try {
                                        all.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    all.close();
                                }
                            }
                            throw th2;
                        }
                    }
                    if (all != null) {
                        if (0 != 0) {
                            try {
                                all.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            all.close();
                        }
                    }
                    MatcherAssert.assertThat(Integer.valueOf(i), CoreMatchers.equalTo(0));
                }

                public KeyValue<String, String> transform(String str, String str2) {
                    int i = 0;
                    KeyValueIterator all = this.store.all();
                    Throwable th = null;
                    while (all.hasNext()) {
                        try {
                            try {
                                i++;
                                all.next();
                            } finally {
                            }
                        } catch (Throwable th2) {
                            if (all != null) {
                                if (th != null) {
                                    try {
                                        all.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    all.close();
                                }
                            }
                            throw th2;
                        }
                    }
                    if (all != null) {
                        if (0 != 0) {
                            try {
                                all.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            all.close();
                        }
                    }
                    MatcherAssert.assertThat(Integer.valueOf(i), CoreMatchers.equalTo(Integer.valueOf(this.numRecordsProcessed)));
                    this.store.put(str2, str2, this.context.timestamp());
                    this.numRecordsProcessed++;
                    return new KeyValue<>(str, str2);
                }

                public void close() {
                }
            };
        }, new String[]{"store-name"});
        Properties properties = new Properties();
        properties.put("auto.offset.reset", "earliest");
        properties.put("default.key.serde", Serdes.String().getClass().getName());
        properties.put("default.value.serde", Serdes.String().getClass().getName());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("commit.interval.ms", 10000L);
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(streamsBuilder.build(), properties, ofEpochMilli);
        TestInputTopic createInputTopic = topologyTestDriver.createInputTopic(TOPIC, Serdes.String().serializer(), Serdes.String().serializer(), ofEpochMilli, Duration.ZERO);
        for (int i = 0; i < 5; i++) {
            createInputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString());
        }
        topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(DEFAULT_TIMESTAMP));
        createInputTopic.advanceTime(Duration.ofSeconds(DEFAULT_TIMESTAMP));
        for (int i2 = 0; i2 < 5; i2++) {
            createInputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString());
        }
        topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(DEFAULT_TIMESTAMP));
        createInputTopic.advanceTime(Duration.ofSeconds(DEFAULT_TIMESTAMP));
        for (int i3 = 0; i3 < 5; i3++) {
            createInputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString());
        }
        topologyTestDriver.advanceWallClockTime(Duration.ofSeconds(DEFAULT_TIMESTAMP));
        createInputTopic.advanceTime(Duration.ofSeconds(DEFAULT_TIMESTAMP));
        for (int i4 = 0; i4 < 5; i4++) {
            createInputTopic.pipeInput(UUID.randomUUID().toString(), UUID.randomUUID().toString());
        }
        topologyTestDriver.close();
    }

    @Test
    public void shouldPutFetchFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        MatcherAssert.assertThat(this.cachingStore.fetch(bytesKey("a"), DEFAULT_TIMESTAMP), CoreMatchers.equalTo(bytesValue("a")));
        MatcherAssert.assertThat(this.cachingStore.fetch(bytesKey("b"), DEFAULT_TIMESTAMP), CoreMatchers.equalTo(bytesValue("b")));
        MatcherAssert.assertThat(this.cachingStore.fetch(bytesKey("c"), DEFAULT_TIMESTAMP), CoreMatchers.equalTo((Object) null));
        MatcherAssert.assertThat(this.cachingStore.fetch(bytesKey("a"), 0L), CoreMatchers.equalTo((Object) null));
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("a"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
        Throwable th = null;
        try {
            WindowStoreIterator fetch2 = this.cachingStore.fetch(bytesKey("b"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
            Throwable th2 = null;
            try {
                verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "a");
                verifyKeyValue((KeyValue) fetch2.next(), DEFAULT_TIMESTAMP, "b");
                Assert.assertFalse(fetch.hasNext());
                Assert.assertFalse(fetch2.hasNext());
                Assert.assertEquals(2L, this.cache.size());
                if (fetch2 != null) {
                    if (0 != 0) {
                        try {
                            fetch2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        fetch2.close();
                    }
                }
                if (fetch != null) {
                    if (0 == 0) {
                        fetch.close();
                        return;
                    }
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                }
            } catch (Throwable th5) {
                if (fetch2 != null) {
                    if (0 != 0) {
                        try {
                            fetch2.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        fetch2.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th7;
        }
    }

    private void verifyKeyValue(KeyValue<Long, byte[]> keyValue, long j, String str) {
        MatcherAssert.assertThat(keyValue.key, CoreMatchers.equalTo(Long.valueOf(j)));
        MatcherAssert.assertThat(keyValue.value, CoreMatchers.equalTo(bytesValue(str)));
    }

    private static byte[] bytesValue(String str) {
        return str.getBytes();
    }

    private static Bytes bytesKey(String str) {
        return Bytes.wrap(str.getBytes());
    }

    private String stringFrom(byte[] bArr) {
        return (String) Serdes.String().deserializer().deserialize("", bArr);
    }

    @Test
    public void shouldPutFetchRangeFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        KeyValueIterator fetch = this.cachingStore.fetch(bytesKey("a"), bytesKey("b"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(fetch, Arrays.asList(new Windowed(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()))), Arrays.asList("a", "b"));
            Assert.assertEquals(2L, this.cache.size());
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPutFetchRangeFromCacheForNullKeyFrom() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), 20L);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), 30L);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), 30L);
        KeyValueIterator fetch = this.cachingStore.fetch((Object) null, bytesKey("d"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(30L));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(fetch, Arrays.asList(new Windowed(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("c"), new TimeWindow(20L, 20 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("d"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue()))), Arrays.asList("a", "b", "c", "d"));
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPutFetchRangeFromCacheForNullKeyTo() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), 20L);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), 30L);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), 30L);
        KeyValueIterator fetch = this.cachingStore.fetch(bytesKey("b"), (Object) null, Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(30L));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(fetch, Arrays.asList(new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("c"), new TimeWindow(20L, 20 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("d"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("e"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue()))), Arrays.asList("b", "c", "d", "e"));
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPutFetchRangeFromCacheForNullKeyFromKeyTo() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), 20L);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), 30L);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), 30L);
        KeyValueIterator fetch = this.cachingStore.fetch((Object) null, (Object) null, Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(30L));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(fetch, Arrays.asList(new Windowed(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("c"), new TimeWindow(20L, 20 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("d"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("e"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue()))), Arrays.asList("a", "b", "c", "d", "e"));
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFrom() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), 20L);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), 30L);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), 30L);
        KeyValueIterator backwardFetch = this.cachingStore.backwardFetch((Object) null, bytesKey("c"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(30L));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(backwardFetch, Arrays.asList(new Windowed(bytesKey("c"), new TimeWindow(20L, 20 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()))), Arrays.asList("c", "b", "a"));
            if (backwardFetch != null) {
                if (0 == 0) {
                    backwardFetch.close();
                    return;
                }
                try {
                    backwardFetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (backwardFetch != null) {
                if (0 != 0) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCacheForNullKeyTo() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), 20L);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), 30L);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), 30L);
        KeyValueIterator backwardFetch = this.cachingStore.backwardFetch(bytesKey("c"), (Object) null, Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(30L));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(backwardFetch, Arrays.asList(new Windowed(bytesKey("e"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("d"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("c"), new TimeWindow(20L, 20 + WINDOW_SIZE.longValue()))), Arrays.asList("e", "d", "c"));
            if (backwardFetch != null) {
                if (0 == 0) {
                    backwardFetch.close();
                    return;
                }
                try {
                    backwardFetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (backwardFetch != null) {
                if (0 != 0) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldPutBackwardFetchRangeFromCacheForNullKeyFromKeyTo() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), 20L);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), 30L);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), 30L);
        KeyValueIterator backwardFetch = this.cachingStore.backwardFetch((Object) null, (Object) null, Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(30L));
        Throwable th = null;
        try {
            StreamsTestUtils.verifyAllWindowedKeyValues(backwardFetch, Arrays.asList(new Windowed(bytesKey("e"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("d"), new TimeWindow(30L, 30 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("c"), new TimeWindow(20L, 20 + WINDOW_SIZE.longValue())), new Windowed(bytesKey("b"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), new Windowed(bytesKey("a"), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()))), Arrays.asList("e", "d", "c", "b", "a"));
            if (backwardFetch != null) {
                if (0 == 0) {
                    backwardFetch.close();
                    return;
                }
                try {
                    backwardFetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (backwardFetch != null) {
                if (0 != 0) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldGetAllFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
        KeyValueIterator all = this.cachingStore.all();
        Throwable th = null;
        try {
            for (String str : new String[]{"a", "b", "c", "d", "e", "f", "g", "h"}) {
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) all.next(), new Windowed(bytesKey(str), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), str);
            }
            Assert.assertFalse(all.hasNext());
            if (all != null) {
                if (0 == 0) {
                    all.close();
                    return;
                }
                try {
                    all.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (all != null) {
                if (0 != 0) {
                    try {
                        all.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    all.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldGetAllBackwardFromCache() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("b"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("c"), bytesValue("c"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("d"), bytesValue("d"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("e"), bytesValue("e"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("f"), bytesValue("f"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("g"), bytesValue("g"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("h"), bytesValue("h"), DEFAULT_TIMESTAMP);
        KeyValueIterator backwardAll = this.cachingStore.backwardAll();
        Throwable th = null;
        try {
            for (String str : new String[]{"h", "g", "f", "e", "d", "c", "b", "a"}) {
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardAll.next(), new Windowed(bytesKey(str), new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), str);
            }
            Assert.assertFalse(backwardAll.hasNext());
            if (backwardAll != null) {
                if (0 == 0) {
                    backwardAll.close();
                    return;
                }
                try {
                    backwardAll.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (backwardAll != null) {
                if (0 != 0) {
                    try {
                        backwardAll.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    backwardAll.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldFetchAllWithinTimestampRange() {
        String[] strArr = {"a", "b", "c", "d", "e", "f", "g", "h"};
        for (int i = 0; i < strArr.length; i++) {
            this.cachingStore.put(bytesKey(strArr[i]), bytesValue(strArr[i]), i);
        }
        KeyValueIterator fetchAll = this.cachingStore.fetchAll(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(7L));
        Throwable th = null;
        try {
            for (int i2 = 0; i2 < strArr.length; i2++) {
                String str = strArr[i2];
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetchAll.next(), new Windowed(bytesKey(str), new TimeWindow(i2, i2 + WINDOW_SIZE.longValue())), str);
            }
            Assert.assertFalse(fetchAll.hasNext());
            if (fetchAll != null) {
                if (0 != 0) {
                    try {
                        fetchAll.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fetchAll.close();
                }
            }
            KeyValueIterator fetchAll2 = this.cachingStore.fetchAll(Instant.ofEpochMilli(2L), Instant.ofEpochMilli(4L));
            Throwable th3 = null;
            for (int i3 = 2; i3 <= 4; i3++) {
                try {
                    try {
                        String str2 = strArr[i3];
                        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetchAll2.next(), new Windowed(bytesKey(str2), new TimeWindow(i3, i3 + WINDOW_SIZE.longValue())), str2);
                    } catch (Throwable th4) {
                        th3 = th4;
                        throw th4;
                    }
                } finally {
                }
            }
            Assert.assertFalse(fetchAll2.hasNext());
            if (fetchAll2 != null) {
                if (0 != 0) {
                    try {
                        fetchAll2.close();
                    } catch (Throwable th5) {
                        th3.addSuppressed(th5);
                    }
                } else {
                    fetchAll2.close();
                }
            }
            fetchAll2 = this.cachingStore.fetchAll(Instant.ofEpochMilli(5L), Instant.ofEpochMilli(7L));
            Throwable th6 = null;
            for (int i4 = 5; i4 <= 7; i4++) {
                try {
                    try {
                        String str3 = strArr[i4];
                        StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetchAll2.next(), new Windowed(bytesKey(str3), new TimeWindow(i4, i4 + WINDOW_SIZE.longValue())), str3);
                    } catch (Throwable th7) {
                        th6 = th7;
                        throw th7;
                    }
                } finally {
                }
            }
            Assert.assertFalse(fetchAll2.hasNext());
            if (fetchAll2 != null) {
                if (0 == 0) {
                    fetchAll2.close();
                    return;
                }
                try {
                    fetchAll2.close();
                } catch (Throwable th8) {
                    th6.addSuppressed(th8);
                }
            }
        } catch (Throwable th9) {
            if (fetchAll != null) {
                if (0 != 0) {
                    try {
                        fetchAll.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    fetchAll.close();
                }
            }
            throw th9;
        }
    }

    @Test
    public void shouldFetchAllBackwardWithinTimestampRange() {
        String[] strArr = {"a", "b", "c", "d", "e", "f", "g", "h"};
        for (int i = 0; i < strArr.length; i++) {
            this.cachingStore.put(bytesKey(strArr[i]), bytesValue(strArr[i]), i);
        }
        KeyValueIterator backwardFetchAll = this.cachingStore.backwardFetchAll(Instant.ofEpochMilli(0L), Instant.ofEpochMilli(7L));
        Throwable th = null;
        try {
            try {
                for (int length = strArr.length - 1; length >= 0; length--) {
                    String str = strArr[length];
                    StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFetchAll.next(), new Windowed(bytesKey(str), new TimeWindow(length, length + WINDOW_SIZE.longValue())), str);
                }
                Assert.assertFalse(backwardFetchAll.hasNext());
                if (backwardFetchAll != null) {
                    if (0 != 0) {
                        try {
                            backwardFetchAll.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        backwardFetchAll.close();
                    }
                }
                backwardFetchAll = this.cachingStore.backwardFetchAll(Instant.ofEpochMilli(2L), Instant.ofEpochMilli(4L));
                Throwable th3 = null;
                for (int i2 = 4; i2 >= 2; i2--) {
                    try {
                        try {
                            String str2 = strArr[i2];
                            StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFetchAll.next(), new Windowed(bytesKey(str2), new TimeWindow(i2, i2 + WINDOW_SIZE.longValue())), str2);
                        } catch (Throwable th4) {
                            th3 = th4;
                            throw th4;
                        }
                    } finally {
                    }
                }
                Assert.assertFalse(backwardFetchAll.hasNext());
                if (backwardFetchAll != null) {
                    if (0 != 0) {
                        try {
                            backwardFetchAll.close();
                        } catch (Throwable th5) {
                            th3.addSuppressed(th5);
                        }
                    } else {
                        backwardFetchAll.close();
                    }
                }
                backwardFetchAll = this.cachingStore.backwardFetchAll(Instant.ofEpochMilli(5L), Instant.ofEpochMilli(7L));
                Throwable th6 = null;
                for (int i3 = 7; i3 >= 5; i3--) {
                    try {
                        try {
                            String str3 = strArr[i3];
                            StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFetchAll.next(), new Windowed(bytesKey(str3), new TimeWindow(i3, i3 + WINDOW_SIZE.longValue())), str3);
                        } catch (Throwable th7) {
                            th6 = th7;
                            throw th7;
                        }
                    } finally {
                    }
                }
                Assert.assertFalse(backwardFetchAll.hasNext());
                if (backwardFetchAll != null) {
                    if (0 == 0) {
                        backwardFetchAll.close();
                        return;
                    }
                    try {
                        backwardFetchAll.close();
                    } catch (Throwable th8) {
                        th6.addSuppressed(th8);
                    }
                }
            } catch (Throwable th9) {
                th = th9;
                throw th9;
            }
        } finally {
            if (backwardFetchAll != null) {
                if (th != null) {
                    try {
                        backwardFetchAll.close();
                    } catch (Throwable th10) {
                        th.addSuppressed(th10);
                    }
                } else {
                    backwardFetchAll.close();
                }
            }
        }
    }

    @Test
    public void shouldFlushEvictedItemsIntoUnderlyingStore() {
        int addItemsToCache = addItemsToCache();
        KeyValueIterator fetch = this.bytesStore.fetch(Bytes.wrap("0".getBytes(StandardCharsets.UTF_8)), DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP);
        Throwable th = null;
        try {
            try {
                KeyValue keyValue = (KeyValue) fetch.next();
                Assert.assertEquals(DEFAULT_TIMESTAMP, this.keySchema.segmentTimestamp((Bytes) keyValue.key));
                Assert.assertArrayEquals("0".getBytes(), (byte[]) keyValue.value);
                Assert.assertFalse(fetch.hasNext());
                Assert.assertEquals(addItemsToCache - 1, this.cache.size());
                if (fetch != null) {
                    if (0 == 0) {
                        fetch.close();
                        return;
                    }
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldForwardDirtyItemsWhenFlushCalled() {
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertEquals("a", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
    }

    @Test
    public void shouldSetFlushListener() {
        Assert.assertTrue(this.cachingStore.setFlushListener((CacheFlushListener) null, true));
        Assert.assertTrue(this.cachingStore.setFlushListener((CacheFlushListener) null, false));
    }

    @Test
    public void shouldForwardOldValuesWhenEnabled() {
        this.cachingStore.setFlushListener(this.cacheListener, true);
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertEquals("b", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertEquals("c", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertEquals("b", this.cacheListener.forwarded.get(windowed).oldValue);
        this.cachingStore.put(bytesKey("1"), (byte[]) null, DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertEquals("c", this.cacheListener.forwarded.get(windowed).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), (byte[]) null, DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowed));
        this.cacheListener.forwarded.clear();
    }

    @Test
    public void shouldForwardOldValuesWhenDisabled() {
        Windowed windowed = new Windowed("1", new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertEquals("b", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
        this.cachingStore.put(bytesKey("1"), bytesValue("c"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertEquals("c", this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
        this.cachingStore.put(bytesKey("1"), (byte[]) null, DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).newValue);
        Assert.assertNull(this.cacheListener.forwarded.get(windowed).oldValue);
        this.cacheListener.forwarded.clear();
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), (byte[]) null, DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        Assert.assertNull(this.cacheListener.forwarded.get(windowed));
        this.cacheListener.forwarded.clear();
    }

    @Test
    public void shouldForwardDirtyItemToListenerWhenEvicted() {
        Assert.assertEquals(addItemsToCache(), this.cacheListener.forwarded.size());
    }

    @Test
    public void shouldTakeValueFromCacheIfSameTimestampFlushedToRocks() {
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.flush();
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP);
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("1"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
        Throwable th = null;
        try {
            verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "b");
            Assert.assertFalse(fetch.hasNext());
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldIterateAcrossWindows() {
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("1"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        Throwable th = null;
        try {
            verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "a");
            verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), "b");
            Assert.assertFalse(fetch.hasNext());
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldIterateBackwardAcrossWindows() {
        this.cachingStore.put(bytesKey("1"), bytesValue("a"), DEFAULT_TIMESTAMP);
        this.cachingStore.put(bytesKey("1"), bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        WindowStoreIterator backwardFetch = this.cachingStore.backwardFetch(bytesKey("1"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        Throwable th = null;
        try {
            verifyKeyValue((KeyValue) backwardFetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), "b");
            verifyKeyValue((KeyValue) backwardFetch.next(), DEFAULT_TIMESTAMP, "a");
            Assert.assertFalse(backwardFetch.hasNext());
            if (backwardFetch != null) {
                if (0 == 0) {
                    backwardFetch.close();
                    return;
                }
                try {
                    backwardFetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (backwardFetch != null) {
                if (0 != 0) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldIterateCacheAndStore() {
        Bytes wrap = Bytes.wrap("1".getBytes());
        this.bytesStore.put(WindowKeySchema.toStoreKeyBinary(wrap, DEFAULT_TIMESTAMP, 0), "a".getBytes());
        this.cachingStore.put(wrap, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("1"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        Throwable th = null;
        try {
            try {
                verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP, "a");
                verifyKeyValue((KeyValue) fetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), "b");
                Assert.assertFalse(fetch.hasNext());
                if (fetch != null) {
                    if (0 == 0) {
                        fetch.close();
                        return;
                    }
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldIterateBackwardCacheAndStore() {
        Bytes wrap = Bytes.wrap("1".getBytes());
        this.bytesStore.put(WindowKeySchema.toStoreKeyBinary(wrap, DEFAULT_TIMESTAMP, 0), "a".getBytes());
        this.cachingStore.put(wrap, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        WindowStoreIterator backwardFetch = this.cachingStore.backwardFetch(bytesKey("1"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        Throwable th = null;
        try {
            try {
                verifyKeyValue((KeyValue) backwardFetch.next(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), "b");
                verifyKeyValue((KeyValue) backwardFetch.next(), DEFAULT_TIMESTAMP, "a");
                Assert.assertFalse(backwardFetch.hasNext());
                if (backwardFetch != null) {
                    if (0 == 0) {
                        backwardFetch.close();
                        return;
                    }
                    try {
                        backwardFetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (backwardFetch != null) {
                if (th != null) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldIterateCacheAndStoreKeyRange() {
        Bytes wrap = Bytes.wrap("1".getBytes());
        this.bytesStore.put(WindowKeySchema.toStoreKeyBinary(wrap, DEFAULT_TIMESTAMP, 0), "a".getBytes());
        this.cachingStore.put(wrap, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        KeyValueIterator fetch = this.cachingStore.fetch(wrap, bytesKey("2"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        Throwable th = null;
        try {
            try {
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetch.next(), new Windowed(wrap, new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), "a");
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) fetch.next(), new Windowed(wrap, new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue() + WINDOW_SIZE.longValue())), "b");
                Assert.assertFalse(fetch.hasNext());
                if (fetch != null) {
                    if (0 == 0) {
                        fetch.close();
                        return;
                    }
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldIterateBackwardCacheAndStoreKeyRange() {
        Bytes wrap = Bytes.wrap("1".getBytes());
        this.bytesStore.put(WindowKeySchema.toStoreKeyBinary(wrap, DEFAULT_TIMESTAMP, 0), "a".getBytes());
        this.cachingStore.put(wrap, bytesValue("b"), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue());
        KeyValueIterator backwardFetch = this.cachingStore.backwardFetch(wrap, bytesKey("2"), Instant.ofEpochMilli(DEFAULT_TIMESTAMP), Instant.ofEpochMilli(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue()));
        Throwable th = null;
        try {
            try {
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFetch.next(), new Windowed(wrap, new TimeWindow(DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue(), DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue() + WINDOW_SIZE.longValue())), "b");
                StreamsTestUtils.verifyWindowedKeyValue((KeyValue) backwardFetch.next(), new Windowed(wrap, new TimeWindow(DEFAULT_TIMESTAMP, DEFAULT_TIMESTAMP + WINDOW_SIZE.longValue())), "a");
                Assert.assertFalse(backwardFetch.hasNext());
                if (backwardFetch != null) {
                    if (0 == 0) {
                        backwardFetch.close();
                        return;
                    }
                    try {
                        backwardFetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (backwardFetch != null) {
                if (th != null) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldClearNamespaceCacheOnClose() {
        this.cachingStore.put(bytesKey("a"), bytesValue("a"), 0L);
        Assert.assertEquals(1L, this.cache.size());
        this.cachingStore.close();
        Assert.assertEquals(0L, this.cache.size());
    }

    @Test
    public void shouldThrowIfTryingToFetchFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.fetch(bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
        });
    }

    @Test
    public void shouldThrowIfTryingToFetchRangeFromClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.fetch(bytesKey("a"), bytesKey("b"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
        });
    }

    @Test
    public void shouldThrowIfTryingToWriteToClosedCachingStore() {
        this.cachingStore.close();
        Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.cachingStore.put(bytesKey("a"), bytesValue("a"), 0L);
        });
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0003"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(KeyValue.pair(0L, bytesValue("0001")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(Long.valueOf(SEGMENT_INTERVAL), bytesValue("0005"))), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
    }

    @Test
    public void shouldBackwardFetchAndIterateOverExactKeys() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0003"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(KeyValue.pair(Long.valueOf(SEGMENT_INTERVAL), bytesValue("0005")), KeyValue.pair(1L, bytesValue("0003")), KeyValue.pair(0L, bytesValue("0001"))), StreamsTestUtils.toList(this.cachingStore.backwardFetch(bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
    }

    @Test
    public void shouldFetchAndIterateOverKeyRange() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0003"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(windowedPair("a", "0001", 0L), windowedPair("a", "0003", 1L), windowedPair("a", "0005", SEGMENT_INTERVAL)), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("a"), bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(windowedPair("aa", "0002", 0L), windowedPair("aa", "0004", 1L)), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(windowedPair("a", "0001", 0L), windowedPair("a", "0003", 1L), windowedPair("aa", "0002", 0L), windowedPair("aa", "0004", 1L), windowedPair("a", "0005", SEGMENT_INTERVAL)), StreamsTestUtils.toList(this.cachingStore.fetch(bytesKey("a"), bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
    }

    @Test
    public void shouldFetchAndIterateOverKeyBackwardRange() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 0L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0003"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0004"), 1L);
        this.cachingStore.put(bytesKey("a"), bytesValue("0005"), SEGMENT_INTERVAL);
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(windowedPair("a", "0005", SEGMENT_INTERVAL), windowedPair("a", "0003", 1L), windowedPair("a", "0001", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch(bytesKey("a"), bytesKey("a"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(windowedPair("aa", "0004", 1L), windowedPair("aa", "0002", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
        StreamsTestUtils.verifyKeyValueList(Arrays.asList(windowedPair("a", "0005", SEGMENT_INTERVAL), windowedPair("aa", "0004", 1L), windowedPair("aa", "0002", 0L), windowedPair("a", "0003", 1L), windowedPair("a", "0001", 0L)), StreamsTestUtils.toList(this.cachingStore.backwardFetch(bytesKey("a"), bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))));
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2L);
        this.cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3L);
        WindowStoreIterator fetch = this.cachingStore.fetch(bytesKey("aa"), 0L, 5L);
        Throwable th = null;
        try {
            KeyValueIterator fetch2 = this.cachingStore.fetch(bytesKey("aa"), bytesKey("aa"), 0L, 5L);
            Throwable th2 = null;
            try {
                try {
                    Assert.assertEquals(stringFrom((byte[]) ((KeyValue) fetch.next()).value), stringFrom((byte[]) ((KeyValue) fetch2.next()).value));
                    Assert.assertEquals(stringFrom((byte[]) ((KeyValue) fetch.next()).value), stringFrom((byte[]) ((KeyValue) fetch2.next()).value));
                    Assert.assertFalse(fetch.hasNext());
                    Assert.assertFalse(fetch2.hasNext());
                    if (fetch2 != null) {
                        if (0 != 0) {
                            try {
                                fetch2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            fetch2.close();
                        }
                    }
                    if (fetch != null) {
                        if (0 == 0) {
                            fetch.close();
                            return;
                        }
                        try {
                            fetch.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (fetch2 != null) {
                    if (th2 != null) {
                        try {
                            fetch2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        fetch2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeBackwardFetch() {
        this.cachingStore.put(bytesKey("a"), bytesValue("0001"), 0L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0002"), 1L);
        this.cachingStore.put(bytesKey("aa"), bytesValue("0003"), 2L);
        this.cachingStore.put(bytesKey("aaa"), bytesValue("0004"), 3L);
        WindowStoreIterator backwardFetch = this.cachingStore.backwardFetch(bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
        Throwable th = null;
        try {
            KeyValueIterator backwardFetch2 = this.cachingStore.backwardFetch(bytesKey("aa"), bytesKey("aa"), Instant.ofEpochMilli(0L), Instant.ofEpochMilli(5L));
            Throwable th2 = null;
            try {
                try {
                    Assert.assertEquals(stringFrom((byte[]) ((KeyValue) backwardFetch.next()).value), stringFrom((byte[]) ((KeyValue) backwardFetch2.next()).value));
                    Assert.assertEquals(stringFrom((byte[]) ((KeyValue) backwardFetch.next()).value), stringFrom((byte[]) ((KeyValue) backwardFetch2.next()).value));
                    Assert.assertFalse(backwardFetch.hasNext());
                    Assert.assertFalse(backwardFetch2.hasNext());
                    if (backwardFetch2 != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch2.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            backwardFetch2.close();
                        }
                    }
                    if (backwardFetch != null) {
                        if (0 == 0) {
                            backwardFetch.close();
                            return;
                        }
                        try {
                            backwardFetch.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (backwardFetch2 != null) {
                    if (th2 != null) {
                        try {
                            backwardFetch2.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        backwardFetch2.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (backwardFetch != null) {
                if (0 != 0) {
                    try {
                        backwardFetch.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    backwardFetch.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.put((Bytes) null, bytesValue("anyValue"), 0L);
        });
    }

    @Test
    public void shouldNotThrowNullPointerExceptionOnPutNullValue() {
        this.cachingStore.put(bytesKey("a"), (byte[]) null, 0L);
    }

    @Test
    public void shouldThrowNullPointerExceptionOnFetchNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> {
            this.cachingStore.fetch((Object) null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
        });
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        Bytes wrap = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
        Bytes wrap2 = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(CachingWindowStore.class);
        Throwable th = null;
        try {
            KeyValueIterator fetch = this.cachingStore.fetch(wrap, wrap2, 0L, DEFAULT_TIMESTAMP);
            Throwable th2 = null;
            try {
                try {
                    Assert.assertFalse(fetch.hasNext());
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (fetch != null) {
                    if (th2 != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        fetch.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldNotThrowInvalidBackwardRangeExceptionWithNegativeFromKey() {
        Bytes wrap = Bytes.wrap(Serdes.Integer().serializer().serialize("", -1));
        Bytes wrap2 = Bytes.wrap(Serdes.Integer().serializer().serialize("", 1));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(CachingWindowStore.class);
        Throwable th = null;
        try {
            KeyValueIterator backwardFetch = this.cachingStore.backwardFetch(wrap, wrap2, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(DEFAULT_TIMESTAMP));
            Throwable th2 = null;
            try {
                try {
                    Assert.assertFalse(backwardFetch.hasNext());
                    MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                    if (createAndRegister != null) {
                        if (0 == 0) {
                            createAndRegister.close();
                            return;
                        }
                        try {
                            createAndRegister.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (backwardFetch != null) {
                    if (th2 != null) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createAndRegister != null) {
                if (0 != 0) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th8;
        }
    }

    @Test
    public void shouldCloseCacheAndWrappedStoreAfterErrorDuringCacheFlush() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on flush"));
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingWindowStore cachingWindowStore = this.cachingStore;
        cachingWindowStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingWindowStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseWrappedStoreAfterErrorDuringCacheClose() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingWindowStore cachingWindowStore = this.cachingStore;
        cachingWindowStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingWindowStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    @Test
    public void shouldCloseCacheAfterErrorDuringStateStoreClose() {
        setUpCloseTests();
        EasyMock.reset(new Object[]{this.cache});
        this.cache.flush(CACHE_NAMESPACE);
        this.cache.close(CACHE_NAMESPACE);
        EasyMock.replay(new Object[]{this.cache});
        EasyMock.reset(new Object[]{this.underlyingStore});
        this.underlyingStore.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("Simulating an error on close"));
        EasyMock.replay(new Object[]{this.underlyingStore});
        CachingWindowStore cachingWindowStore = this.cachingStore;
        cachingWindowStore.getClass();
        Assert.assertThrows(RuntimeException.class, cachingWindowStore::close);
        EasyMock.verify(new Object[]{this.cache, this.underlyingStore});
    }

    private void setUpCloseTests() {
        this.underlyingStore = (WindowStore) EasyMock.createNiceMock(WindowStore.class);
        EasyMock.expect(this.underlyingStore.name()).andStubReturn("store-name");
        EasyMock.expect(Boolean.valueOf(this.underlyingStore.isOpen())).andStubReturn(true);
        EasyMock.replay(new Object[]{this.underlyingStore});
        this.cachingStore = new CachingWindowStore(this.underlyingStore, WINDOW_SIZE.longValue(), SEGMENT_INTERVAL);
        this.cache = (ThreadCache) EasyMock.createNiceMock(ThreadCache.class);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), null, null, null, this.cache);
        this.context.setRecordContext(new ProcessorRecordContext(DEFAULT_TIMESTAMP, 0L, 0, TOPIC, new RecordHeaders()));
        this.cachingStore.init(this.context, this.cachingStore);
    }

    private static KeyValue<Windowed<Bytes>, byte[]> windowedPair(String str, String str2, long j) {
        return KeyValue.pair(new Windowed(bytesKey(str), new TimeWindow(j, j + WINDOW_SIZE.longValue())), bytesValue(str2));
    }

    private int addItemsToCache() {
        int i = 0;
        int i2 = 0;
        while (i < MAX_CACHE_SIZE_BYTES) {
            int i3 = i2;
            i2++;
            String valueOf = String.valueOf(i3);
            this.cachingStore.put(bytesKey(valueOf), bytesValue(valueOf), DEFAULT_TIMESTAMP);
            i = (int) (i + ThreadCacheTest.memoryCacheEntrySize(valueOf.getBytes(), valueOf.getBytes(), TOPIC) + 8 + 4);
        }
        return i2;
    }
}
