/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream.multijoin;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor;
import org.apache.flink.table.runtime.operators.join.stream.multijoin.StreamingMultiJoinOperatorTestBase;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StateParameterizedHarnessTestBase;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class StreamingTwoWayNonEquiJoinOperatorTest
extends StreamingMultiJoinOperatorTestBase {
    private static final GeneratedJoinCondition EqualIdAndGreaterAmountCondition = StreamingTwoWayNonEquiJoinOperatorTest.createAndCondition(StreamingTwoWayNonEquiJoinOperatorTest.createJoinCondition(1, 0), StreamingTwoWayNonEquiJoinOperatorTest.createFieldLongGreaterThanCondition(1, 1));
    private static final List<GeneratedJoinCondition> customJoinConditions = Arrays.asList(null, EqualIdAndGreaterAmountCondition);
    private static final Map<Integer, List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>> customAttributeMap = new HashMap<Integer, List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>();

    public StreamingTwoWayNonEquiJoinOperatorTest(StateParameterizedHarnessTestBase.StateBackendMode stateBackendMode) {
        super(stateBackendMode, 2, List.of(FlinkJoinType.INNER, FlinkJoinType.INNER), customJoinConditions, customAttributeMap, false);
    }

    @TestTemplate
    void testInnerJoinWithNonEquiCondition() throws Exception {
        this.insertUser("1", 100L, "Gus");
        this.emitsNothing();
        this.insertOrder("1", 50L, "order_1");
        this.emits(INSERT, this.r("1", 100L, "Gus", "1", 50L, "order_1"));
        this.insertOrder("1", 150L, "order_2");
        this.emitsNothing();
        this.insertOrder("1", 100L, "order_3");
        this.emitsNothing();
        this.deleteUser("1", 100L, "Gus");
        this.emits(DELETE, this.r("1", 100L, "Gus", "1", 50L, "order_1"));
        this.insertUser("1", 200L, "Gus Updated");
        this.emits(INSERT, this.r("1", 200L, "Gus Updated", "1", 50L, "order_1"), INSERT, this.r("1", 200L, "Gus Updated", "1", 100L, "order_3"), INSERT, this.r("1", 200L, "Gus Updated", "1", 150L, "order_2"));
        this.deleteOrder("1", 50L, "order_1");
        this.emits(DELETE, this.r("1", 200L, "Gus Updated", "1", 50L, "order_1"));
    }

    @TestTemplate
    void testInnerJoinWithNonEquiConditionUserUpdates() throws Exception {
        this.insertUser("1", 100L, "Gus");
        this.emitsNothing();
        this.updateBeforeUser("1", 100L, "Gus");
        this.emitsNothing();
        this.updateAfterUser("1", 200L, "Gus Updated");
        this.emitsNothing();
        this.insertOrder("1", 150L, "order_1");
        this.emits(INSERT, this.r("1", 200L, "Gus Updated", "1", 150L, "order_1"));
        this.updateBeforeUser("1", 200L, "Gus Updated");
        this.emits(UPDATE_BEFORE, this.r("1", 200L, "Gus Updated", "1", 150L, "order_1"));
        this.updateAfterUser("1", 200L, "Gus Updated Again");
        this.emits(UPDATE_AFTER, this.r("1", 200L, "Gus Updated Again", "1", 150L, "order_1"));
        this.updateBeforeUser("1", 200L, "Gus Updated Again");
        this.emits(UPDATE_BEFORE, this.r("1", 200L, "Gus Updated Again", "1", 150L, "order_1"));
        this.updateAfterUser("1", 50L, "Gus Updated Again to 50");
        this.emitsNothing();
        this.updateBeforeUser("1", 50L, "Gus Updated Again to 50");
        this.emitsNothing();
        this.updateAfterUser("1", 200L, "Gus Updated Again");
        this.emits(UPDATE_AFTER, this.r("1", 200L, "Gus Updated Again", "1", 150L, "order_1"));
        this.insertOrder("1", 150L, "order_2");
        this.emits(INSERT, this.r("1", 200L, "Gus Updated Again", "1", 150L, "order_2"));
        this.updateBeforeUser("1", 200L, "Gus Updated Again");
        this.emits(UPDATE_BEFORE, this.r("1", 200L, "Gus Updated Again", "1", 150L, "order_1"), UPDATE_BEFORE, this.r("1", 200L, "Gus Updated Again", "1", 150L, "order_2"));
        this.updateAfterUser("1", 500L, "Gus Final");
        this.emits(UPDATE_AFTER, this.r("1", 500L, "Gus Final", "1", 150L, "order_1"), UPDATE_AFTER, this.r("1", 500L, "Gus Final", "1", 150L, "order_2"));
    }

    @TestTemplate
    void testInnerJoinWithNonEquiConditionOrderUpdates() throws Exception {
        this.insertUser("1", 100L, "Gus");
        this.emitsNothing();
        this.insertOrder("1", 150L, "order_1");
        this.emitsNothing();
        this.updateBeforeOrder("1", 150L, "order_1");
        this.emitsNothing();
        this.updateAfterOrder("1", 50L, "order_1");
        this.emits(RowKind.UPDATE_AFTER, this.r("1", 100L, "Gus", "1", 50L, "order_1"));
        this.updateBeforeOrder("1", 50L, "order_1");
        this.emits(RowKind.UPDATE_BEFORE, this.r("1", 100L, "Gus", "1", 50L, "order_1"));
        this.updateAfterOrder("1", 100L, "order_1");
        this.emitsNothing();
        this.insertUser("1", 200L, "Bob");
        this.emits(INSERT, this.r("1", 200L, "Bob", "1", 100L, "order_1"));
        this.updateBeforeOrder("1", 100L, "order_1");
        this.emits(UPDATE_BEFORE, this.r("1", 200L, "Bob", "1", 100L, "order_1"));
        this.updateAfterOrder("1", 250L, "order_1_too_high");
        this.emitsNothing();
    }

    @Override
    protected RowType createInputTypeInfo(int inputIndex) {
        if (inputIndex == 0) {
            return RowType.of((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE, new BigIntType(), VarCharType.STRING_TYPE}, (String[])new String[]{"user_id_0", "amount_0", "name_0"});
        }
        return RowType.of((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE, new BigIntType(), VarCharType.STRING_TYPE}, (String[])new String[]{"user_id_1", "amount_1", "order_id_1"});
    }

    @Override
    protected InternalTypeInfo<RowData> createUniqueKeyType(int inputIndex) {
        if (inputIndex == 0) {
            return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE}, (String[])new String[]{"user_id_0"}));
        }
        return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{VarCharType.STRING_TYPE}, (String[])new String[]{"order_id_1"}));
    }

    @Override
    protected RowDataKeySelector createKeySelector(int inputIndex) {
        return HandwrittenSelectorUtil.getRowDataSelector(new int[]{inputIndex == 0 ? 0 : 2}, ((RowType)this.inputTypeInfos.get(inputIndex)).getChildren().toArray(new LogicalType[0]));
    }

    protected static GeneratedJoinCondition createAmountGreaterThanCondition(final int usersInputInArray, final int usersAmountField, final int ordersInputInArray, final int ordersAmountField) {
        String generatedClassName = "AmountGreaterThanCondition_manual";
        return new GeneratedJoinCondition(generatedClassName, "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                return new AmountGreaterThanConditionImpl(usersInputInArray, usersAmountField, ordersInputInArray, ordersAmountField);
            }
        };
    }

    static {
        customAttributeMap.put(1, Collections.singletonList(new AttributeBasedJoinKeyExtractor.ConditionAttributeRef(0, 0, 1, 0)));
    }

    private static class AndJoinConditionImpl
    extends AbstractRichFunction
    implements JoinCondition {
        private final List<JoinCondition> conditions;

        public AndJoinConditionImpl(JoinCondition ... conditions) {
            this.conditions = Arrays.asList(conditions);
        }

        public boolean apply(RowData left, RowData right) {
            for (JoinCondition condition : this.conditions) {
                if (condition.apply(left, right)) continue;
                return false;
            }
            return true;
        }
    }

    private static class UserIdEqualsConditionImpl
    extends AbstractRichFunction
    implements JoinCondition {
        private final int usersInputId;
        private final int usersIdField;
        private final int ordersInputId;
        private final int ordersIdField;

        public UserIdEqualsConditionImpl(int usersInputId, int usersIdField, int ordersInputId, int ordersIdField) {
            this.usersInputId = usersInputId;
            this.usersIdField = usersIdField;
            this.ordersInputId = ordersInputId;
            this.ordersIdField = ordersIdField;
        }

        public boolean apply(RowData left, RowData right) {
            if (left == null || right == null) {
                return false;
            }
            if (left.isNullAt(this.usersIdField) || right.isNullAt(this.ordersIdField)) {
                return false;
            }
            String userId = left.getString(this.usersIdField).toString();
            String orderUserId = right.getString(this.ordersIdField).toString();
            return userId.equals(orderUserId);
        }
    }

    private static class AmountGreaterThanConditionImpl
    extends AbstractRichFunction
    implements JoinCondition {
        private final int leftInputIndex;
        private final int leftAmountFieldIndex;
        private final int rightInputIndex;
        private final int rightAmountFieldIndex;

        public AmountGreaterThanConditionImpl(int leftInputIndex, int leftAmountFieldIndex, int rightInputIndex, int rightAmountFieldIndex) {
            this.leftInputIndex = leftInputIndex;
            this.leftAmountFieldIndex = leftAmountFieldIndex;
            this.rightInputIndex = rightInputIndex;
            this.rightAmountFieldIndex = rightAmountFieldIndex;
        }

        public boolean apply(RowData left, RowData right) {
            if (left == null || right == null) {
                return false;
            }
            if (left.isNullAt(this.leftAmountFieldIndex) || right.isNullAt(this.rightAmountFieldIndex)) {
                return false;
            }
            return left.getLong(this.leftAmountFieldIndex) > right.getLong(this.rightAmountFieldIndex);
        }
    }
}

