/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.functions.table;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.functions.table.lookup.CachingAsyncLookupFunction;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class CachingAsyncLookupFunctionTest {
    private static final RowData KEY_1 = GenericRowData.of((Object[])new Object[]{1});
    private static final Collection<RowData> VALUE_1 = Collections.singletonList(GenericRowData.of((Object[])new Object[]{1, "Alice", 18L}));
    private static final RowData KEY_2 = GenericRowData.of((Object[])new Object[]{2});
    private static final Collection<RowData> VALUE_2 = Arrays.asList(GenericRowData.of((Object[])new Object[]{2, "Bob", 20L}), GenericRowData.of((Object[])new Object[]{2, "Charlie", 22L}));
    private static final RowData NON_EXIST_KEY = GenericRowData.of((Object[])new Object[]{3});

    CachingAsyncLookupFunctionTest() {
    }

    @Test
    void testCaching() throws Exception {
        TestingAsyncLookupFunction delegate = new TestingAsyncLookupFunction();
        CachingAsyncLookupFunction function = this.createCachingFunction(delegate);
        FutureUtils.completeAll(Arrays.asList(function.asyncLookup(KEY_1), function.asyncLookup(KEY_2), function.asyncLookup(NON_EXIST_KEY))).get();
        FutureUtils.completeAll(Arrays.asList(function.asyncLookup(KEY_1), function.asyncLookup(KEY_2), function.asyncLookup(NON_EXIST_KEY))).get();
        Assertions.assertThat((AtomicInteger)delegate.getLookupCount()).hasValue(3);
        Assertions.assertThat((Collection)function.getCache().getIfPresent(KEY_1)).containsExactlyInAnyOrderElementsOf(VALUE_1);
        Assertions.assertThat((Collection)function.getCache().getIfPresent(KEY_2)).containsExactlyInAnyOrderElementsOf(VALUE_2);
        Assertions.assertThat((Collection)function.getCache().getIfPresent(NON_EXIST_KEY)).isEmpty();
    }

    private CachingAsyncLookupFunction createCachingFunction(AsyncLookupFunction delegate) throws Exception {
        CachingAsyncLookupFunction function = new CachingAsyncLookupFunction((LookupCache)DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE).build(), delegate);
        function.open(new FunctionContext((RuntimeContext)new MockStreamingRuntimeContext(1, 0)));
        return function;
    }

    private static final class TestingAsyncLookupFunction
    extends AsyncLookupFunction {
        private final transient ConcurrentMap<RowData, Collection<RowData>> data = new ConcurrentHashMap<RowData, Collection<RowData>>();
        private transient AtomicInteger lookupCount;
        private transient ExecutorService executor;

        private TestingAsyncLookupFunction() {
        }

        public void open(FunctionContext context) throws Exception {
            this.data.put(KEY_1, VALUE_1);
            this.data.put(KEY_2, VALUE_2);
            this.lookupCount = new AtomicInteger(0);
            this.executor = Executors.newFixedThreadPool(3);
        }

        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(ThreadLocalRandom.current().nextInt(0, 10));
                    Collection values = (Collection)this.data.get(keyRow);
                    this.lookupCount.incrementAndGet();
                    return values;
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to lookup value", e);
                }
            }, this.executor);
        }

        public AtomicInteger getLookupCount() {
            return this.lookupCount;
        }
    }
}

