package org.apache.flink.table.runtime.operators.join;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.retryable.AsyncRetryStrategies;
import org.apache.flink.streaming.util.retryable.RetryPredicates;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.runtime.operators.join.lookup.ResultRetryStrategy;
import org.apache.flink.table.runtime.operators.join.lookup.RetryableAsyncLookupFunctionDelegator;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.types.logical.LogicalType;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest.class */
public class RetryableAsyncLookupFunctionDelegatorTest {
    private static final Map<RowData, Collection<RowData>> data = new HashMap();
    private final AsyncLookupFunction userLookupFunc = new TestingAsyncLookupFunction();
    private final ResultRetryStrategy retryStrategy = ResultRetryStrategy.fixedDelayRetry(3, 10, RetryPredicates.EMPTY_RESULT_PREDICATE);
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});

    /* loaded from: input_file:org/apache/flink/table/runtime/operators/join/RetryableAsyncLookupFunctionDelegatorTest$TestingAsyncLookupFunction.class */
    private static final class TestingAsyncLookupFunction extends AsyncLookupFunction {
        private static final long serialVersionUID = 1;
        private final Random random;
        private transient ExecutorService executor;

        private TestingAsyncLookupFunction() {
            this.random = new Random();
        }

        public void open(FunctionContext functionContext) throws Exception {
            super.open(functionContext);
            this.executor = Executors.newFixedThreadPool(2);
        }

        public CompletableFuture<Collection<RowData>> asyncLookup(RowData rowData) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(this.random.nextInt(5));
                    return (Collection) RetryableAsyncLookupFunctionDelegatorTest.data.get(rowData);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }, this.executor);
        }

        public void close() throws Exception {
            if (null != this.executor && !this.executor.isShutdown()) {
                this.executor.shutdown();
            }
            super.close();
        }
    }

    private RetryableAsyncLookupFunctionDelegator createDelegator(ResultRetryStrategy resultRetryStrategy) {
        return new RetryableAsyncLookupFunctionDelegator(this.userLookupFunc, resultRetryStrategy);
    }

    @Test
    public void testLookupWithRetry() throws Exception {
        RetryableAsyncLookupFunctionDelegator createDelegator = createDelegator(this.retryStrategy);
        createDelegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1)));
        for (int i = 1; i <= 5; i++) {
            GenericRowData of = GenericRowData.of(new Object[]{Integer.valueOf(i)});
            this.assertor.assertOutputEquals("output wrong", Collections.singleton(data.get(of)), Collections.singleton(createDelegator.asyncLookup(of)));
        }
        createDelegator.close();
    }

    @Test
    public void testLookupWithRetryDisabled() throws Exception {
        RetryableAsyncLookupFunctionDelegator createDelegator = createDelegator(ResultRetryStrategy.NO_RETRY_STRATEGY);
        createDelegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1)));
        for (int i = 1; i <= 5; i++) {
            GenericRowData of = GenericRowData.of(new Object[]{Integer.valueOf(i)});
            this.assertor.assertOutputEquals("output wrong", Collections.singleton(data.get(of)), Collections.singleton(createDelegator.asyncLookup(of)));
        }
        createDelegator.close();
    }

    @Test
    public void testLookupWithCustomRetry() throws Exception {
        RetryableAsyncLookupFunctionDelegator createDelegator = createDelegator(new ResultRetryStrategy(new AsyncRetryStrategies.ExponentialBackoffDelayRetryStrategyBuilder(3, 1L, 100L, 1.1d).build()));
        createDelegator.open(new FunctionContext(new MockStreamingRuntimeContext(false, 1, 1)));
        for (int i = 1; i <= 5; i++) {
            GenericRowData of = GenericRowData.of(new Object[]{Integer.valueOf(i)});
            this.assertor.assertOutputEquals("output wrong", Collections.singleton(data.get(of)), Collections.singleton(createDelegator.asyncLookup(of)));
        }
        createDelegator.close();
    }

    static {
        data.put(GenericRowData.of(new Object[]{1}), Collections.singletonList(GenericRowData.of(new Object[]{1, StringData.fromString("Julian")})));
        data.put(GenericRowData.of(new Object[]{3}), Arrays.asList(GenericRowData.of(new Object[]{3, StringData.fromString("Jark")}), GenericRowData.of(new Object[]{3, StringData.fromString("Jackson")})));
        data.put(GenericRowData.of(new Object[]{4}), Collections.singletonList(GenericRowData.of(new Object[]{4, StringData.fromString("Fabian")})));
    }
}
