/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.stream;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.join.stream.bundle.BufferBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasNoUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.InputSideHasUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.bundle.JoinKeyContainsUniqueKeyBundle;
import org.apache.flink.table.runtime.operators.join.stream.utils.JoinInputSideSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.CharType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.utils.HandwrittenSelectorUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class BufferBundleTest {
    private BufferBundle<?> buffer;
    private final InternalTypeInfo<RowData> inputTypeInfo = InternalTypeInfo.of((RowType)RowType.of((LogicalType[])new LogicalType[]{new CharType(false, 20), new CharType(false, 20), VarCharType.STRING_TYPE}, (String[])new String[]{"order_id", "line_order_id", "shipping_address"}));
    private final RowDataKeySelector joinKeySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{0}, this.inputTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
    private final RowDataKeySelector uniqueKeySelector = HandwrittenSelectorUtil.getRowDataSelector(new int[]{1}, this.inputTypeInfo.toRowType().getChildren().toArray(new LogicalType[0]));
    private final JoinInputSideSpec inputSideHasUniqueKey = JoinInputSideSpec.withUniqueKey(this.inputTypeInfo, (KeySelector)this.uniqueKeySelector);

    BufferBundleTest() {
    }

    @AfterEach
    void cleanup() {
        this.buffer.clear();
    }

    @ParameterizedTest(name="joinKeyContainsUniqueKey: {0}")
    @ValueSource(booleans={true, false})
    void testAccumulateAccumulatePattern(boolean joinKeyContainsUniqueKey) throws Exception {
        this.buffer = joinKeyContainsUniqueKey ? new JoinKeyContainsUniqueKeyBundle() : new InputSideHasUniqueKeyBundle();
        this.testAccumulateWithAccumulatePatternInternal();
    }

    @ParameterizedTest(name="joinKeyContainsUniqueKey: {0}")
    @ValueSource(booleans={true, false})
    void testAccumulateRetractPattern(boolean joinKeyContainsUniqueKey) throws Exception {
        this.buffer = joinKeyContainsUniqueKey ? new JoinKeyContainsUniqueKeyBundle() : new InputSideHasUniqueKeyBundle();
        this.testAccumulateWithRetractPatternInternal();
    }

    @ParameterizedTest(name="joinKeyContainsUniqueKey: {0}")
    @ValueSource(booleans={true, false})
    void testRetractAccumulatePattern(boolean joinKeyContainsUniqueKey) throws Exception {
        this.buffer = joinKeyContainsUniqueKey ? new JoinKeyContainsUniqueKeyBundle() : new InputSideHasUniqueKeyBundle();
        this.testRetractWithAccumulatePatternInternal();
    }

    @ParameterizedTest(name="joinKeyContainsUniqueKey: {0}")
    @ValueSource(booleans={true, false})
    void testRetractRetractPattern(boolean joinKeyContainsUniqueKey) throws Exception {
        this.buffer = joinKeyContainsUniqueKey ? new JoinKeyContainsUniqueKeyBundle() : new InputSideHasUniqueKeyBundle();
        this.testRetractWithRetractPatternInternal();
    }

    @ParameterizedTest(name="joinKeyContainsUniqueKey: {0}")
    @ValueSource(booleans={true, false})
    void testPatternCombination(boolean joinKeyContainsUniqueKey) throws Exception {
        this.buffer = joinKeyContainsUniqueKey ? new JoinKeyContainsUniqueKeyBundle() : new InputSideHasUniqueKeyBundle();
        this.testPatternCombinationInternal();
    }

    @Test
    void testInputSideHasNoUniqueKey() throws Exception {
        this.buffer = new InputSideHasNoUniqueKeyBundle();
        List<RowData> records = Stream.of(StreamRecordUtils.deleteRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateBeforeRecord("Ord#90", "LineOrd#90", "110 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.insertRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.updateBeforeRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.updateAfterRecord("Ord#10", "LineOrd#10", "14 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.insertRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.deleteRecord("Ord#1", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#90", "LineOrd#90", "110 Bellevue Drive, Pottstown, PI 19464"), StreamRecordUtils.updateBeforeRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PD 19464"), StreamRecordUtils.deleteRecord("Ord#3", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateBeforeRecord("Ord#10", "LineOrd#10", "14 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#6", "8 Bellevue Drive, Pottstown, PF 19464"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#9", "11 Bellevue Drive, Pottstown, PI 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.addRecordsToBuffer(records);
        this.assertRecords(Collections.emptyList(), records.size());
    }

    private void testAccumulateWithAccumulatePatternInternal() throws Exception {
        List<RowData> records = Stream.of(StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#3", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "10 Bellevue Drive, Pottstown, PJyy 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.addRecordsToBuffer(records);
        List<RowData> expected = Stream.of(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#3", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "10 Bellevue Drive, Pottstown, PJyy 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.assertRecords(expected, records.size());
        this.assertJoinKeys(expected);
    }

    private void testAccumulateWithRetractPatternInternal() throws Exception {
        List<RowData> records = Stream.of(StreamRecordUtils.insertRecord("Ord#6", "LineOrd#6", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#7", "LineOrd#7", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#8", "LineOrd#8", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#9", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.updateBeforeRecord("Ord#6", "LineOrd#6", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateBeforeRecord("Ord#8", "LineOrd#8", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.deleteRecord("Ord#7", "LineOrd#7", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.deleteRecord("Ord#9", "LineOrd#9", "10 Bellevue Drive, Pottstown, PJyy 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.addRecordsToBuffer(records);
        this.assertRecords(Collections.emptyList(), records.size());
        this.assertJoinKeys(Collections.emptyList());
    }

    private void testRetractWithAccumulatePatternInternal() throws Exception {
        List<RowData> records = Stream.of(StreamRecordUtils.updateBeforeRecord("Ord#14", "LineOrd#14", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateBeforeRecord("Ord#15", "LineOrd#15", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.deleteRecord("Ord#16", "LineOrd#16", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.deleteRecord("Ord#17", "LineOrd#17", "10 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#14", "LineOrd#14", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#16", "LineOrd#16", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#15", "LineOrd#15", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#17", "LineOrd#17", "6 Bellevue Drive, Pottstown, PK 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.addRecordsToBuffer(records);
        List<RowData> expected = Stream.of(StreamRecordUtils.updateBeforeRecord("Ord#14", "LineOrd#14", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.insertRecord("Ord#14", "LineOrd#14", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#16", "LineOrd#16", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#16", "LineOrd#16", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.deleteRecord("Ord#17", "LineOrd#17", "10 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.updateAfterRecord("Ord#17", "LineOrd#17", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.updateBeforeRecord("Ord#15", "LineOrd#15", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.updateAfterRecord("Ord#15", "LineOrd#15", "5 Bellevue Drive, Pottstown, PJ 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.assertRecords(expected, records.size());
        this.assertJoinKeys(expected);
    }

    private void testRetractWithRetractPatternInternal() throws Exception {
        List<RowData> records = Stream.of(StreamRecordUtils.updateBeforeRecord("Ord#18", "LineOrd#18", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.deleteRecord("Ord#19", "LineOrd#19", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.deleteRecord("Ord#18", "LineOrd#18", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.updateBeforeRecord("Ord#19", "LineOrd#19", "4 Bellevue Drive, Pottstown, PB 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.addRecordsToBuffer(records);
        List<RowData> expected = Stream.of(StreamRecordUtils.deleteRecord("Ord#18", "LineOrd#18", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.updateBeforeRecord("Ord#19", "LineOrd#19", "4 Bellevue Drive, Pottstown, PB 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.assertRecords(expected, records.size());
        this.assertJoinKeys(expected);
    }

    private void testPatternCombinationInternal() throws Exception {
        List<RowData> records = Stream.of(StreamRecordUtils.updateAfterRecord("Ord#4", "LineOrd#4", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "10 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#1", "LineOrd#1", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#3", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.insertRecord("Ord#6", "LineOrd#6", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#7", "LineOrd#7", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#8", "LineOrd#8", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateAfterRecord("Ord#9", "LineOrd#9", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.updateBeforeRecord("Ord#6", "LineOrd#6", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#3", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.updateBeforeRecord("Ord#8", "LineOrd#8", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.updateBeforeRecord("Ord#15", "LineOrd#15", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.deleteRecord("Ord#16", "LineOrd#16", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.deleteRecord("Ord#7", "LineOrd#7", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.deleteRecord("Ord#9", "LineOrd#9", "10 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.updateBeforeRecord("Ord#14", "LineOrd#14", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.deleteRecord("Ord#17", "LineOrd#17", "10 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.updateAfterRecord("Ord#17", "LineOrd#17", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.updateBeforeRecord("Ord#18", "LineOrd#18", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.insertRecord("Ord#14", "LineOrd#14", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.insertRecord("Ord#16", "LineOrd#16", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateAfterRecord("Ord#15", "LineOrd#15", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.deleteRecord("Ord#19", "LineOrd#19", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.deleteRecord("Ord#18", "LineOrd#18", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.updateBeforeRecord("Ord#19", "LineOrd#19", "4 Bellevue Drive, Pottstown, PB 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.addRecordsToBuffer(records);
        List<RowData> result = Stream.of(StreamRecordUtils.updateAfterRecord("Ord#1", "LineOrd#1", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.updateBeforeRecord("Ord#14", "LineOrd#14", "7 Bellevue Drive, Pottstown, PL 19464"), StreamRecordUtils.insertRecord("Ord#14", "LineOrd#14", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.deleteRecord("Ord#18", "LineOrd#18", "3 Bellevue Drive, Pottstown, PA 19464"), StreamRecordUtils.updateAfterRecord("Ord#3", "LineOrd#3", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.deleteRecord("Ord#16", "LineOrd#16", "9 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.insertRecord("Ord#16", "LineOrd#16", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.deleteRecord("Ord#17", "LineOrd#17", "10 Bellevue Drive, Pottstown, PJyy 19464"), StreamRecordUtils.updateAfterRecord("Ord#17", "LineOrd#17", "6 Bellevue Drive, Pottstown, PK 19464"), StreamRecordUtils.insertRecord("Ord#2", "LineOrd#2", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.updateBeforeRecord("Ord#15", "LineOrd#15", "8 Bellevue Drive, Pottstown, PCxx 19464"), StreamRecordUtils.updateAfterRecord("Ord#15", "LineOrd#15", "5 Bellevue Drive, Pottstown, PJ 19464"), StreamRecordUtils.updateBeforeRecord("Ord#19", "LineOrd#19", "4 Bellevue Drive, Pottstown, PB 19464"), StreamRecordUtils.insertRecord("Ord#4", "LineOrd#4", "10 Bellevue Drive, Pottstown, PJyy 19464")).map(StreamRecord::getValue).collect(Collectors.toList());
        this.assertRecords(result, records.size());
        this.assertJoinKeys(result);
    }

    private void assertRecords(List<RowData> expectedRecords, int inputSize) throws Exception {
        List actual = this.buffer.getRecords().values().stream().flatMap(Collection::stream).collect(Collectors.toList());
        Assertions.assertEquals((int)this.buffer.reducedSize(), (int)(inputSize - expectedRecords.size()));
        Assertions.assertTrue((boolean)CollectionUtils.isEqualCollection(actual, expectedRecords));
    }

    private void assertJoinKeys(List<RowData> expectedRecords) throws Exception {
        if (this.buffer instanceof InputSideHasUniqueKeyBundle) {
            HashMap<RowData, Map> map = new HashMap<RowData, Map>();
            for (RowData record : expectedRecords) {
                RowData joinKey = (RowData)this.joinKeySelector.getKey((Object)record);
                RowData uniqueKey = (RowData)this.uniqueKeySelector.getKey((Object)record);
                map.computeIfAbsent(joinKey, k -> new HashMap()).computeIfAbsent(uniqueKey, k -> new ArrayList()).add(record);
            }
            for (RowData joinKey : this.buffer.getJoinKeys()) {
                Assertions.assertEquals((Object)this.buffer.getRecordsWithJoinKey(joinKey), map.get(joinKey));
            }
        }
    }

    private void addRecordsToBuffer(List<RowData> input) throws Exception {
        for (RowData record : input) {
            RowData joinKey = (RowData)this.joinKeySelector.getKey((Object)record);
            RowData uniqueKey = null;
            if (this.buffer instanceof InputSideHasUniqueKeyBundle) {
                Assertions.assertNotNull((Object)this.inputSideHasUniqueKey.getUniqueKeySelector());
                uniqueKey = (RowData)this.inputSideHasUniqueKey.getUniqueKeySelector().getKey((Object)record);
            }
            this.buffer.addRecord(joinKey, uniqueKey, record);
        }
    }
}

