package org.apache.flink.table.runtime.functions.table.fullcache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.functions.table.lookup.fullcache.LookupFullCache;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/functions/table/fullcache/LookupFullCacheTest.class */
public class LookupFullCacheTest {
    private final TestManualCacheReloadTrigger reloadTrigger = new TestManualCacheReloadTrigger();

    @Test
    void testAddDataAfterLoad() throws Exception {
        RowData row = StreamRecordUtils.row(2);
        List singletonList = Collections.singletonList(StreamRecordUtils.row(2, "Alex"));
        TestCacheLoader testCacheLoader = new TestCacheLoader(map -> {
        });
        LookupFullCache createAndLoadCache = createAndLoadCache(testCacheLoader);
        Throwable th = null;
        try {
            try {
                Collection ifPresent = createAndLoadCache.getIfPresent(row);
                Assertions.assertThat(ifPresent).isNotNull();
                Assertions.assertThat(ifPresent.size()).isEqualTo(0);
                this.reloadTrigger.trigger();
                Assertions.assertThat(testCacheLoader.getNumLoads()).isEqualTo(2);
                Assertions.assertThat(createAndLoadCache.getIfPresent(row)).isEqualTo(singletonList);
                if (createAndLoadCache != null) {
                    if (0 != 0) {
                        try {
                            createAndLoadCache.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndLoadCache.close();
                    }
                }
                Assertions.assertThat(this.reloadTrigger.isClosed()).isTrue();
                Assertions.assertThat(testCacheLoader.isStopped()).isTrue();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndLoadCache != null) {
                if (th != null) {
                    try {
                        createAndLoadCache.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndLoadCache.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testUpdateDataAfterLoad() throws Exception {
        RowData row = StreamRecordUtils.row(4);
        List singletonList = Collections.singletonList(StreamRecordUtils.row(4, "Frank"));
        TestCacheLoader testCacheLoader = new TestCacheLoader(map -> {
        });
        LookupFullCache createAndLoadCache = createAndLoadCache(testCacheLoader);
        Throwable th = null;
        try {
            try {
                Assertions.assertThat(createAndLoadCache.getIfPresent(row)).isEqualTo(TestCacheLoader.DATA.get(row));
                this.reloadTrigger.trigger();
                Assertions.assertThat(testCacheLoader.getNumLoads()).isEqualTo(2);
                Assertions.assertThat(createAndLoadCache.getIfPresent(row)).isEqualTo(singletonList);
                if (createAndLoadCache != null) {
                    if (0 != 0) {
                        try {
                            createAndLoadCache.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndLoadCache.close();
                    }
                }
                Assertions.assertThat(this.reloadTrigger.isClosed()).isTrue();
                Assertions.assertThat(testCacheLoader.isStopped()).isTrue();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndLoadCache != null) {
                if (th != null) {
                    try {
                        createAndLoadCache.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndLoadCache.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testRemoveDataAfterLoad() throws Exception {
        RowData row = StreamRecordUtils.row(1);
        TestCacheLoader testCacheLoader = new TestCacheLoader(map -> {
        });
        LookupFullCache createAndLoadCache = createAndLoadCache(testCacheLoader);
        Throwable th = null;
        try {
            try {
                Assertions.assertThat(createAndLoadCache.getIfPresent(row)).isEqualTo(TestCacheLoader.DATA.get(row));
                this.reloadTrigger.trigger();
                Assertions.assertThat(testCacheLoader.getNumLoads()).isEqualTo(2);
                Collection ifPresent = createAndLoadCache.getIfPresent(row);
                Assertions.assertThat(ifPresent).isNotNull();
                Assertions.assertThat(ifPresent.size()).isEqualTo(0);
                if (createAndLoadCache != null) {
                    if (0 != 0) {
                        try {
                            createAndLoadCache.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndLoadCache.close();
                    }
                }
                Assertions.assertThat(this.reloadTrigger.isClosed()).isTrue();
                Assertions.assertThat(testCacheLoader.isStopped()).isTrue();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndLoadCache != null) {
                if (th != null) {
                    try {
                        createAndLoadCache.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndLoadCache.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testExceptionDuringReload() throws Exception {
        RuntimeException runtimeException = new RuntimeException("Reload failed.");
        TestCacheLoader testCacheLoader = new TestCacheLoader(map -> {
            throw runtimeException;
        });
        LookupFullCache createAndLoadCache = createAndLoadCache(testCacheLoader);
        Throwable th = null;
        try {
            try {
                this.reloadTrigger.trigger();
                Assertions.assertThat(testCacheLoader.isStopped()).isTrue();
                Assertions.assertThat(testCacheLoader.getNumLoads()).isEqualTo(2);
                Assertions.assertThatThrownBy(() -> {
                    createAndLoadCache.getIfPresent(StreamRecordUtils.row(1));
                }).hasRootCause(runtimeException);
                this.reloadTrigger.trigger();
                Assertions.assertThat(testCacheLoader.getNumLoads()).isEqualTo(2);
                if (createAndLoadCache != null) {
                    if (0 != 0) {
                        try {
                            createAndLoadCache.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndLoadCache.close();
                    }
                }
                Assertions.assertThat(this.reloadTrigger.isClosed()).isTrue();
                Assertions.assertThat(testCacheLoader.isStopped()).isTrue();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndLoadCache != null) {
                if (th != null) {
                    try {
                        createAndLoadCache.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndLoadCache.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testUnsupportedOperations() {
        LookupFullCache lookupFullCache = new LookupFullCache(new TestCacheLoader(map -> {
        }), this.reloadTrigger);
        Assertions.assertThatThrownBy(() -> {
            lookupFullCache.invalidate(StreamRecordUtils.row(1));
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("invalidate");
        Assertions.assertThatThrownBy(() -> {
            lookupFullCache.put(StreamRecordUtils.row(1), Collections.singletonList(StreamRecordUtils.row(1, "Julian")));
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("put");
    }

    @Test
    void testCacheMetrics() throws Exception {
        TestCacheLoader testCacheLoader = new TestCacheLoader(map -> {
        });
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        LookupFullCache createAndLoadCache = createAndLoadCache(testCacheLoader, interceptingCacheMetricGroup);
        Assertions.assertThat(interceptingCacheMetricGroup.hitCounter).isNotNull();
        Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat(interceptingCacheMetricGroup.missCounter).isNotNull();
        Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat(createAndLoadCache.getIfPresent(StreamRecordUtils.row(1))).isNotEmpty();
        Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(1L);
        Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isEqualTo(0L);
        Assertions.assertThat(createAndLoadCache.getIfPresent(StreamRecordUtils.row(2))).isEmpty();
        Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(2L);
        Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isEqualTo(0L);
    }

    @Test
    void testConcurrentAccess() throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(4);
        TestCacheLoader testCacheLoader = new TestCacheLoader(map -> {
        });
        InterceptingCacheMetricGroup interceptingCacheMetricGroup = new InterceptingCacheMetricGroup();
        try {
            LookupFullCache lookupFullCache = new LookupFullCache(testCacheLoader, this.reloadTrigger);
            Throwable th = null;
            try {
                try {
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < 4; i++) {
                        arrayList.add(runAsync(ThrowingRunnable.unchecked(() -> {
                            lookupFullCache.setUserCodeClassLoader(Thread.currentThread().getContextClassLoader());
                            lookupFullCache.open(interceptingCacheMetricGroup);
                        }), newFixedThreadPool));
                    }
                    FutureUtils.waitForAll(arrayList).get();
                    arrayList.clear();
                    for (int i2 = 0; i2 < 4; i2++) {
                        arrayList.add(runAsync(() -> {
                            RowData row = StreamRecordUtils.row(1);
                            Assertions.assertThat(lookupFullCache.getIfPresent(row)).isEqualTo(TestCacheLoader.DATA.get(row));
                        }, newFixedThreadPool));
                    }
                    FutureUtils.waitForAll(arrayList).get();
                    Assertions.assertThat(interceptingCacheMetricGroup.hitCounter.getCount()).isEqualTo(4);
                    Assertions.assertThat(interceptingCacheMetricGroup.missCounter.getCount()).isZero();
                    if (lookupFullCache != null) {
                        if (0 != 0) {
                            try {
                                lookupFullCache.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            lookupFullCache.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            newFixedThreadPool.shutdownNow();
        }
    }

    private LookupFullCache createAndLoadCache(TestCacheLoader testCacheLoader) throws Exception {
        return createAndLoadCache(testCacheLoader, UnregisteredMetricsGroup.createCacheMetricGroup());
    }

    private LookupFullCache createAndLoadCache(TestCacheLoader testCacheLoader, CacheMetricGroup cacheMetricGroup) throws Exception {
        LookupFullCache lookupFullCache = new LookupFullCache(testCacheLoader, this.reloadTrigger);
        Assertions.assertThat(testCacheLoader.isAwaitTriggered()).isFalse();
        Assertions.assertThat(testCacheLoader.getNumLoads()).isZero();
        lookupFullCache.setUserCodeClassLoader(Thread.currentThread().getContextClassLoader());
        lookupFullCache.open(cacheMetricGroup);
        Assertions.assertThat(testCacheLoader.isAwaitTriggered()).isTrue();
        Assertions.assertThat(testCacheLoader.getNumLoads()).isEqualTo(1);
        Assertions.assertThat(testCacheLoader.getCache()).isEqualTo(TestCacheLoader.DATA);
        return lookupFullCache;
    }

    private CompletableFuture<Void> runAsync(Runnable runnable, ExecutorService executorService) {
        return CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> {
            Thread.sleep(ThreadLocalRandom.current().nextLong(0L, 10L));
            runnable.run();
        }), executorService);
    }
}
