package org.apache.flink.table.planner.codegen.runtimefilter;

import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.runtime.operators.runtimefilter.LocalRuntimeFilterBuilderOperatorTest;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarBinaryType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/planner/codegen/runtimefilter/RuntimeFilterCodeGeneratorTest.class */
class RuntimeFilterCodeGeneratorTest {
    private StreamTaskMailboxTestHarness<RowData> testHarness;

    RuntimeFilterCodeGeneratorTest() {
    }

    @BeforeEach
    void setup() throws Exception {
        RowType of = RowType.of(new LogicalType[]{new IntType(), new VarBinaryType()});
        RowType of2 = RowType.of(new LogicalType[]{new VarCharType(), new IntType()});
        this.testHarness = new StreamTaskMailboxTestHarnessBuilder(TwoInputStreamTask::new, InternalTypeInfo.of(of2)).setupOutputForSingletonOperatorChain(RuntimeFilterCodeGenerator.gen(new CodeGeneratorContext(TableConfig.getDefault(), Thread.currentThread().getContextClassLoader()), of, of2, new int[]{0})).addInput(InternalTypeInfo.of(of)).addInput(InternalTypeInfo.of(of2)).build();
    }

    @AfterEach
    void cleanup() throws Exception {
        if (this.testHarness != null) {
            this.testHarness.close();
        }
    }

    @Test
    void testNormalFilter() throws Exception {
        finishBuildPhase(createNormalInput());
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var1", 111), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var3", 333), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var5", 555), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var6", 666), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var8", 888), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var9", 999), 1);
        this.testHarness.processEvent(new EndOfData(StopMode.DRAIN), 1);
        Assertions.assertThat(getOutputRowData()).containsExactly(new GenericRowData[]{GenericRowData.of(new Object[]{"var1", 111}), GenericRowData.of(new Object[]{"var3", 333}), GenericRowData.of(new Object[]{"var5", 555})});
    }

    @Test
    void testOverMaxRowCountLimitFilter() throws Exception {
        finishBuildPhase(createOverMaxRowCountLimitInput());
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var1", 111), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var3", 333), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var5", 555), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var6", 666), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var8", 888), 1);
        this.testHarness.processElement(LocalRuntimeFilterBuilderOperatorTest.createRowDataRecord("var9", 999), 1);
        this.testHarness.processEvent(new EndOfData(StopMode.DRAIN), 1);
        Assertions.assertThat(getOutputRowData()).containsExactly(new GenericRowData[]{GenericRowData.of(new Object[]{"var1", 111}), GenericRowData.of(new Object[]{"var3", 333}), GenericRowData.of(new Object[]{"var5", 555}), GenericRowData.of(new Object[]{"var6", 666}), GenericRowData.of(new Object[]{"var8", 888}), GenericRowData.of(new Object[]{"var9", 999})});
    }

    private void finishBuildPhase(StreamRecord<RowData> streamRecord) throws Exception {
        this.testHarness.processElement(streamRecord, 0);
        this.testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0);
    }

    private List<GenericRowData> getOutputRowData() {
        return (List) this.testHarness.getOutput().stream().map(obj -> {
            return (RowData) ((StreamRecord) obj).getValue();
        }).map(rowData -> {
            Assertions.assertThat(rowData.getArity()).isEqualTo(2);
            return GenericRowData.of(new Object[]{rowData.getString(0).toString(), Integer.valueOf(rowData.getInt(1))});
        }).collect(Collectors.toList());
    }

    private static StreamRecord<RowData> createNormalInput() throws Exception {
        StreamTaskMailboxTestHarness createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements = LocalRuntimeFilterBuilderOperatorTest.createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements(5, 10);
        StreamRecord<RowData> streamRecord = (StreamRecord) createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements.getOutput().poll();
        createLocalRuntimeFilterBuilderOperatorHarnessAndProcessElements.close();
        return streamRecord;
    }

    private static StreamRecord<RowData> createOverMaxRowCountLimitInput() {
        return new StreamRecord<>(GenericRowData.of(new Object[]{-1, null}));
    }
}
