/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.aggregate.correlate;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.data.utils.JoinedRowData;
import org.apache.flink.table.runtime.generated.GeneratedFunctionWrapper;
import org.apache.flink.table.runtime.operators.correlate.async.AsyncCorrelateRunner;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class AsyncCorrelateRunnerTest {
    @Test
    public void testRows() throws Exception {
        TestResultFuture resultFuture = new TestResultFuture();
        AsyncCorrelateRunner runner = new AsyncCorrelateRunner(new GeneratedFunctionWrapper<ImmediateCallbackFunction>(new ImmediateCallbackFunction()), this.createConverter((LogicalType)RowType.of((LogicalType[])new LogicalType[]{new IntType()})));
        runner.setRuntimeContext(this.createRuntimeContext());
        runner.open((OpenContext)null);
        runner.asyncInvoke((RowData)GenericRowData.of((Object[])new Object[]{0}), (ResultFuture)resultFuture);
        Collection<RowData> rows = resultFuture.getResult().get();
        Assertions.assertThat(rows).containsExactly((Object[])new RowData[0]);
        resultFuture = new TestResultFuture();
        runner.asyncInvoke((RowData)GenericRowData.of((Object[])new Object[]{1}), (ResultFuture)resultFuture);
        rows = resultFuture.getResult().get();
        Assertions.assertThat(rows).containsExactly((Object[])new RowData[]{new JoinedRowData((RowData)GenericRowData.of((Object[])new Object[]{1}), (RowData)GenericRowData.of((Object[])new Object[]{1}))});
        resultFuture = new TestResultFuture();
        runner.asyncInvoke((RowData)GenericRowData.of((Object[])new Object[]{2}), (ResultFuture)resultFuture);
        rows = resultFuture.getResult().get();
        Assertions.assertThat(rows).containsExactly((Object[])new RowData[]{new JoinedRowData((RowData)GenericRowData.of((Object[])new Object[]{2}), (RowData)GenericRowData.of((Object[])new Object[]{10})), new JoinedRowData((RowData)GenericRowData.of((Object[])new Object[]{2}), (RowData)GenericRowData.of((Object[])new Object[]{20}))});
    }

    @Test
    public void testException() throws Exception {
        TestResultFuture resultFuture = new TestResultFuture();
        AsyncCorrelateRunner runner = new AsyncCorrelateRunner(new GeneratedFunctionWrapper<ExceptionFunction>(new ExceptionFunction()), this.createConverter((LogicalType)RowType.of((LogicalType[])new LogicalType[]{new IntType()})));
        runner.setRuntimeContext(this.createRuntimeContext());
        runner.open((OpenContext)null);
        runner.asyncInvoke((RowData)GenericRowData.of((Object[])new Object[]{0}), (ResultFuture)resultFuture);
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> resultFuture.getResult().get()).isInstanceOf(ExecutionException.class)).cause().isInstanceOf(RuntimeException.class)).hasMessageContaining("Error!!!");
        TestResultFuture otherResultFuture = new TestResultFuture();
        runner.asyncInvoke((RowData)GenericRowData.of((Object[])new Object[]{1}), (ResultFuture)otherResultFuture);
        ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> otherResultFuture.getResult().get()).isInstanceOf(ExecutionException.class)).cause().isInstanceOf(RuntimeException.class)).hasMessageContaining("Other Error!");
    }

    private RuntimeContext createRuntimeContext() {
        return new RuntimeUDFContext((TaskInfo)new TaskInfoImpl("", 1, 0, 1, 0), this.getClass().getClassLoader(), new ExecutionConfig(), new HashMap(), new HashMap(), UnregisteredMetricsGroup.createOperatorMetricGroup());
    }

    private DataStructureConverter<RowData, Object> createConverter(LogicalType logicalType) {
        return this.cast((DataStructureConverter<Object, Object>)DataStructureConverters.getConverter((DataType)TypeConversions.fromLogicalToDataType((LogicalType)logicalType)));
    }

    private DataStructureConverter<RowData, Object> cast(DataStructureConverter<Object, Object> converter) {
        return converter;
    }

    public static final class TestResultFuture
    implements ResultFuture<RowData> {
        CompletableFuture<Collection<RowData>> data = new CompletableFuture();

        public void complete(Collection<RowData> result) {
            this.data.complete(result);
        }

        public void completeExceptionally(Throwable error) {
            this.data.completeExceptionally(error);
        }

        public CompletableFuture<Collection<RowData>> getResult() {
            return this.data;
        }

        public void complete(CollectionSupplier<RowData> supplier) {
            try {
                this.data.complete(supplier.get());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class ExceptionFunction
    implements AsyncFunction<RowData, Object> {
        public void asyncInvoke(RowData input, ResultFuture<Object> resultFuture) throws Exception {
            int val = input.getInt(0);
            if (val == 0) {
                throw new RuntimeException("Error!!!!");
            }
            resultFuture.completeExceptionally((Throwable)new RuntimeException("Other Error!"));
        }
    }

    public static class ImmediateCallbackFunction
    implements AsyncFunction<RowData, Object> {
        public void asyncInvoke(RowData input, ResultFuture<Object> resultFuture) throws Exception {
            ArrayList<Row> result = new ArrayList<Row>();
            int val = input.getInt(0);
            if (val == 1) {
                result.add(Row.of((Object[])new Object[]{1}));
            } else if (val > 1) {
                result.add(Row.of((Object[])new Object[]{10}));
                result.add(Row.of((Object[])new Object[]{20}));
            }
            resultFuture.complete(result);
        }
    }
}

