/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorFactory;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.generated.GeneratedResultFutureWrapper;
import org.apache.flink.table.runtime.operators.join.LookupJoinHarnessTest;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinRunner;
import org.apache.flink.table.runtime.operators.join.lookup.AsyncLookupJoinWithCalcRunner;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class AsyncLookupJoinHarnessTest {
    private static final int ASYNC_BUFFER_CAPACITY = 100;
    private static final int ASYNC_TIMEOUT_MS = 3000;
    private boolean orderedResult;
    private final TypeSerializer<RowData> inSerializer = new RowDataSerializer(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});
    private final RowDataHarnessAssertor assertor = new RowDataHarnessAssertor(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()});
    private final DataType rightRowDataType = (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f0", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"f1", (DataType)DataTypes.STRING())}).bridgedTo(RowData.class);
    private final DataStructureConverter<RowData, Object> fetcherConverter = DataStructureConverters.getConverter((DataType)this.rightRowDataType);
    private final RowDataSerializer rightRowSerializer = (RowDataSerializer)InternalSerializers.create((LogicalType)this.rightRowDataType.getLogicalType());

    AsyncLookupJoinHarnessTest() {
    }

    @Parameters(name="ordered result = {0}")
    private static Object[] parameters() {
        return new Object[][]{{true}, {false}};
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testTemporalInnerAsyncJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITHOUT_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.checkResult(expectedOutput, testHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testTemporalInnerAsyncJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.INNER_JOIN, FilterOnTable.WITH_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        this.checkResult(expectedOutput, testHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testTemporalLeftAsyncJoin() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITHOUT_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
            testHarness.processElement(StreamRecordUtils.insertRecord(6, null));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jark"));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(6, null, null, null));
        this.checkResult(expectedOutput, testHarness.getOutput());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testTemporalLeftAsyncJoinWithFilter() throws Exception {
        OneInputStreamOperatorTestHarness<RowData, RowData> testHarness = this.createHarness(JoinType.LEFT_JOIN, FilterOnTable.WITH_FILTER);
        testHarness.open();
        Object object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.processElement(StreamRecordUtils.insertRecord(1, "a"));
            testHarness.processElement(StreamRecordUtils.insertRecord(2, "b"));
            testHarness.processElement(StreamRecordUtils.insertRecord(3, "c"));
            testHarness.processElement(StreamRecordUtils.insertRecord(4, "d"));
            testHarness.processElement(StreamRecordUtils.insertRecord(5, "e"));
            testHarness.processElement(StreamRecordUtils.insertRecord(6, null));
        }
        object = testHarness.getCheckpointLock();
        synchronized (object) {
            testHarness.endInput();
            testHarness.close();
        }
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        expectedOutput.add(StreamRecordUtils.insertRecord(1, "a", 1, "Julian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(2, "b", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(3, "c", 3, "Jackson"));
        expectedOutput.add(StreamRecordUtils.insertRecord(4, "d", 4, "Fabian"));
        expectedOutput.add(StreamRecordUtils.insertRecord(5, "e", null, null));
        expectedOutput.add(StreamRecordUtils.insertRecord(6, null, null, null));
        this.checkResult(expectedOutput, testHarness.getOutput());
    }

    private void checkResult(Collection<Object> expectedOutput, Collection<Object> actualOutput) {
        if (this.orderedResult) {
            this.assertor.assertOutputEquals("output wrong.", expectedOutput, actualOutput);
        } else {
            this.assertor.assertOutputEqualsSorted("output wrong.", expectedOutput, actualOutput);
        }
    }

    private OneInputStreamOperatorTestHarness<RowData, RowData> createHarness(JoinType joinType, FilterOnTable filterOnTable) throws Exception {
        boolean isLeftJoin = joinType == JoinType.LEFT_JOIN;
        Object joinRunner = filterOnTable == FilterOnTable.WITHOUT_FILTER ? new AsyncLookupJoinRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), this.fetcherConverter, new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), new GeneratedFunctionWrapper<LookupJoinHarnessTest.TestingPreFilterCondition>(new LookupJoinHarnessTest.TestingPreFilterCondition()), this.rightRowSerializer, isLeftJoin, 100) : new AsyncLookupJoinWithCalcRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), this.fetcherConverter, new GeneratedFunctionWrapper<CalculateOnTemporalTable>(new CalculateOnTemporalTable()), new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), new GeneratedFunctionWrapper<LookupJoinHarnessTest.TestingPreFilterCondition>(new LookupJoinHarnessTest.TestingPreFilterCondition()), this.rightRowSerializer, isLeftJoin, 100);
        return new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new AsyncWaitOperatorFactory((AsyncFunction)joinRunner, 3000L, 100, this.orderedResult ? AsyncDataStream.OutputMode.ORDERED : AsyncDataStream.OutputMode.UNORDERED), this.inSerializer);
    }

    @TestTemplate
    void testCloseAsyncLookupJoinRunner() throws Exception {
        AsyncLookupJoinRunner joinRunner = new AsyncLookupJoinRunner(new GeneratedFunctionWrapper<TestingFetcherFunction>(new TestingFetcherFunction()), this.fetcherConverter, new GeneratedResultFutureWrapper<TestingFetcherResultFuture>(new TestingFetcherResultFuture()), new GeneratedFunctionWrapper<LookupJoinHarnessTest.TestingPreFilterCondition>(new LookupJoinHarnessTest.TestingPreFilterCondition()), this.rightRowSerializer, true, 100);
        Assertions.assertThat((List)joinRunner.getAllResultFutures()).isNull();
        this.closeAsyncLookupJoinRunner(joinRunner);
        joinRunner.setRuntimeContext((RuntimeContext)new MockStreamingRuntimeContext(1, 0));
        joinRunner.open(DefaultOpenContext.INSTANCE);
        Assertions.assertThat((List)joinRunner.getAllResultFutures()).isNotNull();
        this.closeAsyncLookupJoinRunner(joinRunner);
        joinRunner.open(DefaultOpenContext.INSTANCE);
        joinRunner.asyncInvoke(StreamRecordUtils.row(1, "a"), (ResultFuture)new TestingFetcherResultFuture());
        Assertions.assertThat((List)joinRunner.getAllResultFutures()).isNotNull();
        this.closeAsyncLookupJoinRunner(joinRunner);
    }

    private void closeAsyncLookupJoinRunner(AsyncLookupJoinRunner joinRunner) throws Exception {
        try {
            joinRunner.close();
        }
        catch (NullPointerException e) {
            Assertions.fail((String)"Unexpected close to fail with null pointer exception.");
        }
    }

    public static final class CalculateOnTemporalTable
    implements FlatMapFunction<RowData, RowData> {
        private static final long serialVersionUID = -1860345072157431136L;

        public void flatMap(RowData value, Collector<RowData> out) throws Exception {
            BinaryStringData name = (BinaryStringData)value.getString(1);
            if (name.getSizeInBytes() >= 6) {
                out.collect((Object)value);
            }
        }
    }

    public static final class TestingFetcherResultFuture
    extends TableFunctionResultFuture<RowData> {
        private static final long serialVersionUID = -312754413938303160L;

        public void complete(Collection<RowData> result) {
            this.getResultFuture().complete(result);
        }
    }

    public static final class TestingFetcherFunction
    extends AbstractRichFunction
    implements AsyncFunction<RowData, RowData> {
        private static final long serialVersionUID = 4018474964018227081L;
        private static final Map<Integer, List<RowData>> data = new HashMap<Integer, List<RowData>>();
        private final Random random = new Random();
        private transient ExecutorService executor;

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.executor = Executors.newFixedThreadPool(2);
        }

        public void asyncInvoke(RowData input, ResultFuture<RowData> resultFuture) throws Exception {
            int id = input.getInt(0);
            CompletableFuture.supplyAsync(() -> {
                try {
                    Thread.sleep(this.random.nextInt(5));
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return data.get(id);
            }, this.executor).thenAcceptAsync(arg_0 -> resultFuture.complete(arg_0), (Executor)this.executor);
        }

        public void close() throws Exception {
            super.close();
            if (null != this.executor && !this.executor.isShutdown()) {
                this.executor.shutdown();
            }
        }

        static {
            data.put(1, Collections.singletonList(GenericRowData.of((Object[])new Object[]{1, StringData.fromString((String)"Julian")})));
            data.put(3, Arrays.asList(GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jark")}), GenericRowData.of((Object[])new Object[]{3, StringData.fromString((String)"Jackson")})));
            data.put(4, Collections.singletonList(GenericRowData.of((Object[])new Object[]{4, StringData.fromString((String)"Fabian")})));
        }
    }

    private static enum FilterOnTable {
        WITH_FILTER,
        WITHOUT_FILTER;

    }

    private static enum JoinType {
        INNER_JOIN,
        LEFT_JOIN;

    }
}

