/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Predicate;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.async.AsyncRetryStrategy;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.streaming.util.retryable.RetryPredicates;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
import org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.LogicalType;
import org.junit.jupiter.api.Test;

class RetryableAsyncLookupFunctionDelegatorTest {
    private final AsyncLookupFunction userLookupFunc = new TestingAsyncLookupFunction();
    private final ResultRetryStrategy retryStrategy = ResultRetryStrategy.fixedDelayRetry((int)3, (long)10L, (Predicate)RetryPredicates.EMPTY_RESULT_PREDICATE);
    private static final Map<RowData, Collection<RowData>> data = new HashMap<RowData, Collection<RowData>>();
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});

    RetryableAsyncLookupFunctionDelegatorTest() {
    }

    private RetryableAsyncLookupFunctionDelegator createDelegator(ResultRetryStrategy retryStrategy) {
        return new RetryableAsyncLookupFunctionDelegator(this.userLookupFunc, retryStrategy);
    }

    @Test
    void testLookupWithRetry() throws Exception {
        RetryableAsyncLookupFunctionDelegator delegator = this.createDelegator(this.retryStrategy);
        delegator.open(new FunctionContext((RuntimeContext)new MockStreamingRuntimeContext(1, 0)));
        for (int i = 1; i <= 5; ++i) {
            GenericRowData key = GenericRowData.of((Object[])new Object[]{i});
            this.assertor.assertOutputEquals("output wrong", Collections.singleton(data.get(key)), Collections.singleton(delegator.asyncLookup((RowData)key)));
        }
        delegator.close();
    }

    @Test
    void testLookupWithRetryDisabled() throws Exception {
        RetryableAsyncLookupFunctionDelegator delegator = this.createDelegator(ResultRetryStrategy.NO_RETRY_STRATEGY);
        delegator.open(new FunctionContext((RuntimeContext)new MockStreamingRuntimeContext(1, 0)));
        for (int i = 1; i <= 5; ++i) {
            GenericRowData key = GenericRowData.of((Object[])new Object[]{i});
            this.assertor.assertOutputEquals("output wrong", Collections.singleton(data.get(key)), Collections.singleton(delegator.asyncLookup((RowData)key)));
        }
        delegator.close();
    }

    @Test
    void testLookupWithCustomRetry() throws Exception {
        AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategy retryStrategy = new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder(3, 1L, 100L, 1.1).build();
        RetryableAsyncLookupFunctionDelegator delegator = this.createDelegator(new ResultRetryStrategy((AsyncRetryStrategy)retryStrategy));
        delegator.open(new FunctionContext((RuntimeContext)new MockStreamingRuntimeContext(1, 0)));
        for (int i = 1; i <= 5; ++i) {
            GenericRowData key = GenericRowData.of((Object[])new Object[]{i});
            this.assertor.assertOutputEquals("output wrong", Collections.singleton(data.get(key)), Collections.singleton(delegator.asyncLookup((RowData)key)));
        }
        delegator.close();
    }

    static {
        data.put((RowData)GenericRowData.of((Object[])new Object[]{1}), Collections.singletonList(GenericRowData.of((Object[])new Object[]{1, StringData.fromString((String)"Julian")})));
        data.put((RowData)GenericRowData.of((Object[])new Object[]{3}), Arrays.asList(GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jark")}), GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jackson")})));
        data.put((RowData)GenericRowData.of((Object[])new Object[]{4}), Collections.singletonList(GenericRowData.of((Object[])new Object[]{4, StringData.fromString((String)"Fabian")})));
    }

    private static final class TestingAsyncLookupFunction
    extends AsyncLookupFunction {
        private static final long serialVersionUID = 1L;
        private final Random random = new Random();
        private transient ExecutorService executor;

        private TestingAsyncLookupFunction() {
        }

        public void open(FunctionContext context) throws Exception {
            super.open(context);
            this.executor = Executors.newFixedThreadPool(2);
        }

        public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(this.random.nextInt(5));
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return data.get(keyRow);
            }, this.executor);
        }

        public void close() throws Exception {
            if (null != this.executor && !this.executor.isShutdown()) {
                this.executor.shutdown();
            }
            super.close();
        }
    }
}

