package org.apache.flink.table.connector.source.lookup.cache;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/connector/source/lookup/cache/DefaultLookupCacheTest.class */
class DefaultLookupCacheTest {
    private static final RowData KEY = GenericRowData.of(new Object[]{"foo", "lookup", "key"});
    private static final RowData NON_EXIST_KEY = GenericRowData.of(new Object[]{"non-exist"});
    private static final Collection<RowData> VALUE = Arrays.asList(GenericRowData.of(new Object[]{"bar", "lookup", "value", 0}), GenericRowData.of(new Object[]{"bar", "lookup", "value", 1}), GenericRowData.of(new Object[]{"bar", "lookup", "value", 2}), GenericRowData.of(new Object[]{"bar", "lookup", "value", 3}));

    DefaultLookupCacheTest() {
    }

    @Test
    void testBasicReadWriteInCache() throws Exception {
        DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE));
        try {
            createCache.put(KEY, VALUE);
            Assertions.assertThat(createCache.getIfPresent(NON_EXIST_KEY)).isNull();
            Assertions.assertThat(createCache.getIfPresent(KEY)).containsExactlyElementsOf(VALUE);
            if (createCache != null) {
                createCache.close();
            }
        } catch (Throwable th) {
            if (createCache != null) {
                try {
                    createCache.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testExpireAfterAccess() throws Exception {
        Duration ofSeconds = Duration.ofSeconds(15213L);
        Duration ofSeconds2 = Duration.ofSeconds(10L);
        ManualClock manualClock = new ManualClock();
        DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().expireAfterAccess(ofSeconds), manualClock);
        try {
            createCache.put(KEY, VALUE);
            manualClock.advanceTime(ofSeconds.minus(ofSeconds2));
            Assertions.assertThat(createCache.getIfPresent(KEY)).containsExactlyElementsOf(VALUE);
            manualClock.advanceTime(ofSeconds.minus(ofSeconds2));
            Assertions.assertThat(createCache.getIfPresent(KEY)).containsExactlyElementsOf(VALUE);
            manualClock.advanceTime(ofSeconds.plus(ofSeconds2));
            Assertions.assertThat(createCache.getIfPresent(KEY)).isNull();
            if (createCache != null) {
                createCache.close();
            }
        } catch (Throwable th) {
            if (createCache != null) {
                try {
                    createCache.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testExpireAfterWrite() throws Exception {
        Duration ofSeconds = Duration.ofSeconds(15213L);
        Duration ofSeconds2 = Duration.ofSeconds(10L);
        ManualClock manualClock = new ManualClock();
        DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().expireAfterWrite(ofSeconds), manualClock);
        try {
            createCache.put(KEY, VALUE);
            Assertions.assertThat(createCache.getIfPresent(KEY)).containsAll(VALUE);
            manualClock.advanceTime(ofSeconds.plus(ofSeconds2));
            Assertions.assertThat(createCache.getIfPresent(KEY)).isNull();
            if (createCache != null) {
                createCache.close();
            }
        } catch (Throwable th) {
            if (createCache != null) {
                try {
                    createCache.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testSizeBasedEviction() throws Exception {
        DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().maximumSize(10));
        for (int i = 0; i < 10; i++) {
            try {
                createCache.put(GenericRowData.of(new Object[]{"lookup", "key", Integer.valueOf(i)}), VALUE);
            } catch (Throwable th) {
                if (createCache != null) {
                    try {
                        createCache.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        createCache.put(GenericRowData.of(new Object[]{"lookup", "key", 10}), VALUE);
        Assertions.assertThat(createCache.getIfPresent(GenericRowData.of(new Object[]{"lookup", "key", 0}))).isNull();
        for (int i2 = 1; i2 < 10 + 1; i2++) {
            Assertions.assertThat(createCache.getIfPresent(GenericRowData.of(new Object[]{"lookup", "key", Integer.valueOf(i2)}))).isNotNull();
        }
        if (createCache != null) {
            createCache.close();
        }
    }

    @Test
    void testCacheMissingKey() throws Exception {
        DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE));
        try {
            Assertions.assertThatThrownBy(() -> {
                createCache.put((RowData) null, VALUE);
            }).isInstanceOf(NullPointerException.class).hasMessage("Cannot put an entry with null key into the cache");
            Assertions.assertThatThrownBy(() -> {
                createCache.put(KEY, (Collection) null);
            }).isInstanceOf(NullPointerException.class).hasMessage("Cannot put an entry with null value into the cache");
            createCache.put(KEY, Collections.emptyList());
            Assertions.assertThat(createCache.getIfPresent(KEY)).isNotNull().isEmpty();
            if (createCache != null) {
                createCache.close();
            }
            createCache = createCache(DefaultLookupCache.newBuilder().cacheMissingKey(false).maximumSize(Long.MAX_VALUE));
            try {
                createCache.put(KEY, Collections.emptyList());
                Assertions.assertThat(createCache.getIfPresent(KEY)).isNull();
                if (createCache != null) {
                    createCache.close();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    void testCacheMetrics() throws Exception {
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE).maximumSize(Long.MAX_VALUE), null, interceptingCacheMetricGroup);
        try {
            Assertions.assertThat(interceptingCacheMetricGroup.hitCounter).isNotNull();
            Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat(interceptingCacheMetricGroup.missCounter).isNotNull();
            Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat(interceptingCacheMetricGroup.numCachedRecordsGauge).isNotNull();
            Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(0L);
            Assertions.assertThat(interceptingCacheMetricGroup.loadCounter).isNull();
            Assertions.assertThat(interceptingCacheMetricGroup.numLoadFailuresCounter).isNull();
            Assertions.assertThat(interceptingCacheMetricGroup.latestLoadTimeGauge).isNull();
            Assertions.assertThat(interceptingCacheMetricGroup.numCachedBytesGauge).isNull();
            createCache.put(KEY, VALUE);
            Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(1L);
            createCache.getIfPresent(KEY);
            Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(1L);
            createCache.getIfPresent(NON_EXIST_KEY);
            Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isEqualTo(1L);
            if (createCache != null) {
                createCache.close();
            }
        } catch (Throwable th) {
            if (createCache != null) {
                try {
                    createCache.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testCacheSerialization() throws Exception {
        DefaultLookupCache createCopySerializable = CommonTestUtils.createCopySerializable(DefaultLookupCache.newBuilder().cacheMissingKey(true).maximumSize(15213L).expireAfterWrite(Duration.ofMillis(18213L)).expireAfterAccess(Duration.ofMillis(15513L)).build());
        Assertions.assertThat(createCopySerializable.isCacheMissingKey()).isEqualTo(true);
        Assertions.assertThat(createCopySerializable.getMaximumSize()).isEqualTo(15213L);
        Assertions.assertThat(createCopySerializable.getExpireAfterWriteDuration()).isEqualTo(Duration.ofMillis(18213L));
        Assertions.assertThat(createCopySerializable.getExpireAfterAccessDuration()).isEqualTo(Duration.ofMillis(15513L));
    }

    @Test
    void testConcurrentAccess() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4 * 2);
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        try {
            DefaultLookupCache createCache = createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE));
            try {
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 4; i++) {
                    String str = "key-" + i;
                    String str2 = "value-" + i;
                    arrayList.add(runAsync(() -> {
                        createCache.open(interceptingCacheMetricGroup);
                        createCache.put(GenericRowData.of(new Object[]{str}), Collections.singleton(GenericRowData.of(new Object[]{str2})));
                    }, newFixedThreadPool));
                }
                FutureUtils.waitForAll(arrayList).get();
                arrayList.clear();
                for (int i2 = 0; i2 < 4; i2++) {
                    String str3 = "key-" + i2;
                    String str4 = "value-" + i2;
                    arrayList.add(runAsync(() -> {
                        createCache.open(interceptingCacheMetricGroup);
                        Assertions.assertThat(createCache.getIfPresent(GenericRowData.of(new Object[]{str3}))).isEqualTo(Collections.singleton(GenericRowData.of(new Object[]{str4})));
                    }, newFixedThreadPool));
                    arrayList.add(runAsync(() -> {
                        createCache.open(interceptingCacheMetricGroup);
                        Assertions.assertThat(createCache.getIfPresent(NON_EXIST_KEY)).isNull();
                    }, newFixedThreadPool));
                }
                FutureUtils.waitForAll(arrayList).get();
                Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(4);
                Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isEqualTo(4);
                Assertions.assertThat((Long) interceptingCacheMetricGroup.numCachedRecordsGauge.getValue()).isEqualTo(4);
                if (createCache != null) {
                    createCache.close();
                }
            } finally {
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    @Test
    void testBuildFromConfig() {
        Configuration configuration = new Configuration();
        configuration.set(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.PARTIAL);
        configuration.set(LookupOptions.PARTIAL_CACHE_MAX_ROWS, 15213L);
        configuration.set(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, Duration.ofMillis(18213L));
        configuration.set(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, Duration.ofMillis(15513L));
        configuration.set(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, false);
        DefaultLookupCache fromConfig = DefaultLookupCache.fromConfig(configuration);
        Assertions.assertThat(fromConfig.getMaximumSize()).isEqualTo(15213L);
        Assertions.assertThat(fromConfig.getExpireAfterWriteDuration()).isEqualTo(Duration.ofMillis(18213L));
        Assertions.assertThat(fromConfig.getExpireAfterAccessDuration()).isEqualTo(Duration.ofMillis(15513L));
        Assertions.assertThat(fromConfig.isCacheMissingKey()).isFalse();
        Configuration configuration2 = new Configuration();
        configuration2.set(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.NONE);
        Assertions.assertThatThrownBy(() -> {
            DefaultLookupCache.fromConfig(configuration2);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("'lookup.cache' should be 'PARTIAL' in order to build a default lookup cache");
        Configuration configuration3 = new Configuration();
        configuration3.set(LookupOptions.CACHE_TYPE, LookupOptions.LookupCacheType.PARTIAL);
        Assertions.assertThatThrownBy(() -> {
            DefaultLookupCache.fromConfig(configuration3);
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Missing 'lookup.partial-cache.expire-after-access', 'lookup.partial-cache.expire-after-write' or 'lookup.partial-cache.max-rows' in the configuration. The cache will not have evictions under this configuration and could lead to potential memory issues as the cache size may grow indefinitely.");
    }

    @Test
    void testBuilder() {
        DefaultLookupCache build = DefaultLookupCache.newBuilder().cacheMissingKey(true).maximumSize(15213L).expireAfterWrite(Duration.ofMillis(18213L)).expireAfterAccess(Duration.ofMillis(15513L)).build();
        Assertions.assertThat(build.isCacheMissingKey()).isEqualTo(true);
        Assertions.assertThat(build.getMaximumSize()).isEqualTo(15213L);
        Assertions.assertThat(build.getExpireAfterWriteDuration()).isEqualTo(Duration.ofMillis(18213L));
        Assertions.assertThat(build.getExpireAfterAccessDuration()).isEqualTo(Duration.ofMillis(15513L));
        Assertions.assertThatThrownBy(() -> {
            DefaultLookupCache.newBuilder().build();
        }).isInstanceOf(IllegalArgumentException.class).hasMessage("Expiration duration and maximum size are not set for the cache. The cache will not have any eviction and could lead to potential memory issues as the cache size may grow infinitely.");
    }

    private DefaultLookupCache createCache(DefaultLookupCache.Builder builder) throws Exception {
        return createCache(builder, null, null);
    }

    private DefaultLookupCache createCache(DefaultLookupCache.Builder builder, Clock clock) throws Exception {
        return createCache(builder, clock, null);
    }

    private DefaultLookupCache createCache(DefaultLookupCache.Builder builder, Clock clock, CacheMetricGroup cacheMetricGroup) throws Exception {
        DefaultLookupCache createCopySerializable = CommonTestUtils.createCopySerializable(builder.build());
        if (clock != null) {
            createCopySerializable.withClock(clock);
        }
        if (cacheMetricGroup == null) {
            createCopySerializable.open(UnregisteredMetricsGroup.createCacheMetricGroup());
        } else {
            createCopySerializable.open(cacheMetricGroup);
        }
        return createCopySerializable;
    }

    private CompletableFuture<Void> runAsync(Runnable runnable, ExecutorService executorService) {
        return CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> {
            Thread.sleep(ThreadLocalRandom.current().nextLong(0L, 10L));
            runnable.run();
        }), executorService);
    }
}
