/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream.multijoin;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedMultiInputStreamOperatorTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedJoinCondition;
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.stream.StreamingMultiJoinOperatorFactory;
import org.apache.flink.table.runtime.operators.join.stream.keyselector.AttributeBasedJoinKeyExtractor;
import org.apache.flink.table.runtime.operators.join.stream.keyselector.JoinKeyExtractor;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.RowDataHarnessAssertor;
import org.apache.flink.table.runtime.util.StateParameterizedHarnessTestBase;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.apache.flink.types.RowKind;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public abstract class StreamingMultiJoinOperatorTestBase
extends StateParameterizedHarnessTestBase {
    protected static final RowKind INSERT = RowKind.INSERT;
    protected static final RowKind UPDATE_BEFORE = RowKind.UPDATE_BEFORE;
    protected static final RowKind UPDATE_AFTER = RowKind.UPDATE_AFTER;
    protected static final RowKind DELETE = RowKind.DELETE;
    protected final List<RowType> inputTypeInfos;
    protected final List<RowDataKeySelector> keySelectors;
    protected final List<JoinInputSideSpec> inputSpecs;
    protected final List<FlinkJoinType> joinTypes;
    protected final List<GeneratedJoinCondition> joinConditions;
    protected final boolean isFullOuterJoin;
    protected final Map<Integer, List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>> joinAttributeMap;
    protected final JoinKeyExtractor keyExtractor;
    protected RowDataHarnessAssertor asserter;
    protected KeyedMultiInputStreamOperatorTestHarness<RowData, RowData> testHarness;

    protected StreamingMultiJoinOperatorTestBase(StateParameterizedHarnessTestBase.StateBackendMode stateBackendMode, int numInputs, List<FlinkJoinType> joinTypes, List<GeneratedJoinCondition> joinConditions, boolean isFullOuterJoin) {
        super(stateBackendMode);
        this.inputTypeInfos = new ArrayList<RowType>(numInputs);
        this.keySelectors = new ArrayList<RowDataKeySelector>(numInputs);
        this.inputSpecs = new ArrayList<JoinInputSideSpec>(numInputs);
        this.joinTypes = joinTypes;
        this.isFullOuterJoin = isFullOuterJoin;
        this.joinConditions = joinConditions;
        this.joinAttributeMap = new HashMap<Integer, List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>>();
        this.initializeInputs(numInputs);
        this.initializeJoinConditions();
        this.keyExtractor = new AttributeBasedJoinKeyExtractor(this.joinAttributeMap, this.inputTypeInfos);
    }

    protected StreamingMultiJoinOperatorTestBase(StateParameterizedHarnessTestBase.StateBackendMode stateBackendMode, int numInputs, List<FlinkJoinType> joinTypes, List<GeneratedJoinCondition> joinConditions, Map<Integer, List<AttributeBasedJoinKeyExtractor.ConditionAttributeRef>> joinAttributeMap, boolean isFullOuterJoin) {
        super(stateBackendMode);
        this.inputTypeInfos = new ArrayList<RowType>(numInputs);
        this.keySelectors = new ArrayList<RowDataKeySelector>(numInputs);
        this.inputSpecs = new ArrayList<JoinInputSideSpec>(numInputs);
        this.joinTypes = joinTypes;
        this.isFullOuterJoin = isFullOuterJoin;
        this.joinConditions = joinConditions;
        this.joinAttributeMap = joinAttributeMap;
        this.initializeInputs(numInputs);
        this.keyExtractor = new AttributeBasedJoinKeyExtractor(this.joinAttributeMap, this.inputTypeInfos);
    }

    @BeforeEach
    protected void beforeEach() throws Exception {
        this.testHarness = this.createTestHarness();
        this.setupKeySelectorsForTestHarness(this.testHarness);
        this.testHarness.setup();
        this.testHarness.open();
        this.asserter = new RowDataHarnessAssertor(this.getOutputType().getChildren().toArray(new LogicalType[0]));
    }

    @AfterEach
    protected void afterEach() throws Exception {
        if (this.testHarness != null) {
            this.testHarness.close();
        }
    }

    protected void insertUser(Object ... fields) throws Exception {
        this.testHarness.processElement(0, new StreamRecord((Object)StreamRecordUtils.rowOfKind(INSERT, fields)));
    }

    protected void insertOrder(Object ... fields) throws Exception {
        this.testHarness.processElement(1, new StreamRecord((Object)StreamRecordUtils.rowOfKind(INSERT, fields)));
    }

    protected void insertPayment(Object ... fields) throws Exception {
        this.testHarness.processElement(2, new StreamRecord((Object)StreamRecordUtils.rowOfKind(INSERT, fields)));
    }

    protected void insertShipment(Object ... fields) throws Exception {
        this.testHarness.processElement(3, new StreamRecord((Object)StreamRecordUtils.rowOfKind(INSERT, fields)));
    }

    protected void updateBeforeUser(Object ... fields) throws Exception {
        this.testHarness.processElement(0, new StreamRecord((Object)StreamRecordUtils.rowOfKind(UPDATE_BEFORE, fields)));
    }

    protected void updateAfterUser(Object ... fields) throws Exception {
        this.testHarness.processElement(0, new StreamRecord((Object)StreamRecordUtils.rowOfKind(UPDATE_AFTER, fields)));
    }

    protected void updateBeforeOrder(Object ... fields) throws Exception {
        this.testHarness.processElement(1, new StreamRecord((Object)StreamRecordUtils.rowOfKind(UPDATE_BEFORE, fields)));
    }

    protected void updateAfterOrder(Object ... fields) throws Exception {
        this.testHarness.processElement(1, new StreamRecord((Object)StreamRecordUtils.rowOfKind(UPDATE_AFTER, fields)));
    }

    protected void updateBeforePayment(Object ... fields) throws Exception {
        this.testHarness.processElement(2, new StreamRecord((Object)StreamRecordUtils.rowOfKind(UPDATE_BEFORE, fields)));
    }

    protected void updateAfterPayment(Object ... fields) throws Exception {
        this.testHarness.processElement(2, new StreamRecord((Object)StreamRecordUtils.rowOfKind(UPDATE_AFTER, fields)));
    }

    protected void deleteUser(Object ... fields) throws Exception {
        this.testHarness.processElement(0, new StreamRecord((Object)StreamRecordUtils.rowOfKind(DELETE, fields)));
    }

    protected void deleteOrder(Object ... fields) throws Exception {
        this.testHarness.processElement(1, new StreamRecord((Object)StreamRecordUtils.rowOfKind(DELETE, fields)));
    }

    protected void deletePayment(Object ... fields) throws Exception {
        this.testHarness.processElement(2, new StreamRecord((Object)StreamRecordUtils.rowOfKind(DELETE, fields)));
    }

    protected void deleteShipment(Object ... fields) throws Exception {
        this.testHarness.processElement(3, new StreamRecord((Object)StreamRecordUtils.rowOfKind(DELETE, fields)));
    }

    protected static List<GeneratedJoinCondition> defaultConditions() {
        return new ArrayList<GeneratedJoinCondition>();
    }

    protected void emits(RowKind kind, Object ... fields) throws Exception {
        this.asserter.shouldEmitAll((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, this.rowOfKind(kind, fields));
    }

    protected void emitsNothing() {
        this.asserter.shouldEmitNothing((AbstractStreamOperatorTestHarness<RowData>)this.testHarness);
    }

    protected void emits(RowKind kind1, Object[] fields1, RowKind kind2, Object[] fields2) throws Exception {
        this.asserter.shouldEmitAll((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, this.rowOfKind(kind1, fields1), this.rowOfKind(kind2, fields2));
    }

    protected void emits(RowKind kind1, Object[] fields1, RowKind kind2, Object[] fields2, RowKind kind3, Object[] fields3) throws Exception {
        this.asserter.shouldEmitAll((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, this.rowOfKind(kind1, fields1), this.rowOfKind(kind2, fields2), this.rowOfKind(kind3, fields3));
    }

    protected void emits(RowKind kind1, Object[] fields1, RowKind kind2, Object[] fields2, RowKind kind3, Object[] fields3, RowKind kind4, Object[] fields4) throws Exception {
        this.asserter.shouldEmitAll((AbstractStreamOperatorTestHarness<RowData>)this.testHarness, this.rowOfKind(kind1, fields1), this.rowOfKind(kind2, fields2), this.rowOfKind(kind3, fields3), this.rowOfKind(kind4, fields4));
    }

    private void initializeInputs(int numInputs) {
        if (numInputs < 2) {
            throw new IllegalArgumentException("Number of inputs must be at least 2");
        }
        this.inputTypeInfos.add(this.createInputTypeInfo(0));
        this.keySelectors.add(this.createKeySelector(0));
        this.inputSpecs.add(JoinInputSideSpec.withUniqueKeyContainedByJoinKey(this.createUniqueKeyType(0), (KeySelector)((KeySelector)this.keySelectors.get(0))));
        for (int i = 1; i < numInputs; ++i) {
            this.inputTypeInfos.add(this.createInputTypeInfo(i));
            this.keySelectors.add(this.createKeySelector(i));
            InternalTypeInfo<RowData> uniqueKeyRowType = this.createUniqueKeyType(i);
            if (uniqueKeyRowType != null) {
                this.inputSpecs.add(JoinInputSideSpec.withUniqueKey(uniqueKeyRowType, (KeySelector)((KeySelector)this.keySelectors.get(i))));
                continue;
            }
            this.inputSpecs.add(JoinInputSideSpec.withoutUniqueKey());
        }
    }

    private void initializeJoinConditions() {
        if (!this.joinAttributeMap.isEmpty()) {
            return;
        }
        if (this.joinConditions.isEmpty()) {
            this.joinConditions.add(null);
            for (int i = 1; i < this.inputSpecs.size(); ++i) {
                GeneratedJoinCondition condition = StreamingMultiJoinOperatorTestBase.createJoinCondition(i, i - 1);
                this.joinConditions.add(condition);
                this.joinAttributeMap.put(i, Collections.singletonList(new AttributeBasedJoinKeyExtractor.ConditionAttributeRef(i - 1, 0, i, 0)));
            }
        } else if (this.joinConditions.size() != this.inputSpecs.size()) {
            throw new IllegalArgumentException("The number of provided join conditions must match the number of inputs (" + this.inputSpecs.size() + "), but got " + this.joinConditions.size());
        }
    }

    private void setupKeySelectorsForTestHarness(KeyedMultiInputStreamOperatorTestHarness<RowData, RowData> harness) {
        for (int i = 0; i < this.inputSpecs.size(); ++i) {
            SerializableKeySelector keySelector = new SerializableKeySelector(this.keyExtractor, i);
            harness.setKeySelector(i, (KeySelector)keySelector);
        }
    }

    protected KeyedMultiInputStreamOperatorTestHarness<RowData, RowData> createTestHarness() throws Exception {
        RowType joinKeyType = this.keyExtractor.getCommonJoinKeyType();
        InternalTypeInfo partitionKeyTypeInfo = InternalTypeInfo.of((RowType)joinKeyType);
        GeneratedJoinCondition[] generatedJoinConditions = this.joinConditions.toArray(new GeneratedJoinCondition[0]);
        long[] retentionTime = new long[this.inputSpecs.size()];
        Arrays.fill(retentionTime, 9999999L);
        List internalTypeInfos = this.inputTypeInfos.stream().map(InternalTypeInfo::of).collect(Collectors.toList());
        StreamingMultiJoinOperatorFactory factory = new StreamingMultiJoinOperatorFactory(internalTypeInfos, this.inputSpecs, this.joinTypes, null, retentionTime, generatedJoinConditions, this.keyExtractor, this.joinAttributeMap);
        KeyedMultiInputStreamOperatorTestHarness harness = new KeyedMultiInputStreamOperatorTestHarness((StreamOperatorFactory)factory, (TypeInformation)partitionKeyTypeInfo);
        this.setupKeySelectorsForTestHarness((KeyedMultiInputStreamOperatorTestHarness<RowData, RowData>)harness);
        harness.setStateBackend(this.getStateBackend());
        harness.setCheckpointStorage(this.getCheckpointStorage());
        return harness;
    }

    protected RowType getOutputType() {
        Stream typesStream = this.inputTypeInfos.stream().flatMap(typeInfo -> typeInfo.getChildren().stream());
        Stream namesStream = this.inputTypeInfos.stream().flatMap(typeInfo -> typeInfo.getFieldNames().stream());
        return RowType.of((LogicalType[])((LogicalType[])typesStream.toArray(LogicalType[]::new)), (String[])((String[])namesStream.toArray(String[]::new)));
    }

    protected RowData rowOfKind(RowKind kind, Object ... fields) {
        return StreamRecordUtils.rowOfKind(kind, fields);
    }

    protected Object[] r(Object ... values) {
        return values;
    }

    protected RowType createInputTypeInfo(int inputIndex) {
        return RowType.of((LogicalType[])new LogicalType[]{new CharType(false, 20), new CharType(false, 20), VarCharType.STRING_TYPE}, (String[])new String[]{String.format("user_id_%d", inputIndex), String.format("id_%d", inputIndex), String.format("details_%d", inputIndex)});
    }

    protected InternalTypeInfo<RowData> createUniqueKeyType(int inputIndex) {
        return InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new CharType(false, 20)}, (String[])new String[]{String.format(inputIndex == 0 ? "user_id_%d" : "id_%d", inputIndex)}));
    }

    protected RowDataKeySelector createKeySelector(int inputIndex) {
        return HandwrittenSelectorUtil.getRowDataSelector(new int[]{inputIndex == 0 ? 0 : 1}, this.inputTypeInfos.get(inputIndex).getChildren().toArray(new LogicalType[0]));
    }

    protected static GeneratedJoinCondition createJoinCondition(int rightInputId, final int leftInputId) {
        if (rightInputId <= 0 || leftInputId < 0 || rightInputId == leftInputId) {
            throw new IllegalArgumentException(String.format("Invalid indices for creating join condition. rightInputId: %d, leftInputId: %d", rightInputId, leftInputId));
        }
        String generatedClassName = String.format("SpecificInputsEquiKeyCondition_manual_%d_%d", rightInputId, leftInputId);
        return new GeneratedJoinCondition(generatedClassName, "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                return new SpecificInputsEquiKeyCondition(leftInputId * 3, 0);
            }
        };
    }

    protected static GeneratedJoinCondition createFieldLongGreaterThanCondition(final int leftFieldIdx, final int rightFieldIdx) {
        String generatedClassName = String.format("FieldLongGreaterThanCondition_manual_%d_%d", leftFieldIdx, rightFieldIdx);
        return new GeneratedJoinCondition(generatedClassName, "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                return new FieldLongGreaterThanConditionImpl(leftFieldIdx, rightFieldIdx);
            }
        };
    }

    protected static GeneratedJoinCondition createAndCondition(final GeneratedJoinCondition ... generatedConditions) {
        String generatedClassName = "AndJoinCondition_manual";
        return new GeneratedJoinCondition(generatedClassName, "", new Object[0]){

            public JoinCondition newInstance(ClassLoader classLoader) {
                JoinCondition[] actualConditions = new JoinCondition[generatedConditions.length];
                for (int i = 0; i < generatedConditions.length; ++i) {
                    if (generatedConditions[i] == null) {
                        throw new IllegalArgumentException("Null GeneratedJoinCondition passed to createAndCondition");
                    }
                    actualConditions[i] = (JoinCondition)generatedConditions[i].newInstance(classLoader);
                }
                return new AndJoinConditionImpl(actualConditions);
            }
        };
    }

    protected static class AndJoinConditionImpl
    extends AbstractRichFunction
    implements JoinCondition {
        private final JoinCondition[] conditions;

        public AndJoinConditionImpl(JoinCondition ... conditions) {
            this.conditions = conditions;
        }

        public boolean apply(RowData left, RowData right) {
            for (JoinCondition condition : this.conditions) {
                if (condition == null) {
                    return false;
                }
                if (condition.apply(left, right)) continue;
                return false;
            }
            return true;
        }
    }

    protected static class FieldLongGreaterThanConditionImpl
    extends AbstractRichFunction
    implements JoinCondition {
        private final int leftFieldIdx;
        private final int rightFieldIdx;

        public FieldLongGreaterThanConditionImpl(int leftFieldIdx, int rightFieldIdx) {
            this.leftFieldIdx = leftFieldIdx;
            this.rightFieldIdx = rightFieldIdx;
        }

        public boolean apply(RowData left, RowData right) {
            if (left == null || right == null) {
                return false;
            }
            if (left.isNullAt(this.leftFieldIdx) || right.isNullAt(this.rightFieldIdx)) {
                return false;
            }
            return left.getLong(this.leftFieldIdx) > right.getLong(this.rightFieldIdx);
        }
    }

    protected static class SpecificInputsEquiKeyCondition
    extends AbstractRichFunction
    implements JoinCondition {
        private final int leftKeyFieldIndex;
        private final int rightKeyFieldIndex;

        public SpecificInputsEquiKeyCondition(int leftKeyFieldIndex, int rightKeyFieldIndex) {
            this.leftKeyFieldIndex = leftKeyFieldIndex;
            this.rightKeyFieldIndex = rightKeyFieldIndex;
        }

        public boolean apply(RowData left, RowData right) {
            if (left == null || right == null) {
                return false;
            }
            if (left.isNullAt(this.leftKeyFieldIndex) || right.isNullAt(this.rightKeyFieldIndex)) {
                return false;
            }
            String keyLeft = left.getString(this.leftKeyFieldIndex).toString();
            String keyRight = right.getString(this.rightKeyFieldIndex).toString();
            return keyLeft.equals(keyRight);
        }

        private int calculateActualFieldIndex(int inputIndex, int fieldIndex) {
            int actualIndex = fieldIndex;
            for (int i = 0; i < inputIndex; ++i) {
                actualIndex += 3;
            }
            return actualIndex;
        }
    }

    private static class SerializableKeySelector
    implements KeySelector<RowData, RowData>,
    Serializable {
        private static final long serialVersionUID = 1L;
        private final JoinKeyExtractor keyExtractor;
        private final int inputIndex;

        public SerializableKeySelector(JoinKeyExtractor keyExtractor, int inputIndex) {
            this.keyExtractor = keyExtractor;
            this.inputIndex = inputIndex;
        }

        public RowData getKey(RowData value) {
            return this.keyExtractor.getCommonJoinKey(value, this.inputIndex);
        }
    }
}

