/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.dynamicfiltering;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.connector.source.DynamicFilteringData;
import org.apache.flink.table.connector.source.DynamicFilteringEvent;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryStringData;
import org.apache.flink.table.runtime.operators.dynamicfiltering.DynamicFilteringDataCollectorOperator;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DynamicFilteringDataCollectorOperatorTest {
    DynamicFilteringDataCollectorOperatorTest() {
    }

    @Test
    void testCollectDynamicFilteringData() throws Exception {
        ConcurrentLinkedQueue output;
        RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new IntType(), new BigIntType(), new VarCharType()});
        List<Integer> indexes = Arrays.asList(0, 1, 3);
        MockOperatorEventGateway gateway = new MockOperatorEventGateway();
        DynamicFilteringDataCollectorOperator operator = new DynamicFilteringDataCollectorOperator(null, rowType, indexes, -1L, (OperatorEventGateway)gateway);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);){
            output = harness.getOutput();
            harness.setup();
            harness.open();
            for (long i = 0L; i < 3L; ++i) {
                harness.processElement((Object)this.rowData(1, 1L, 0, "a"), i);
            }
            harness.processElement((Object)this.rowData(2, 1L, 0, null), 3L);
        }
        Assertions.assertThat((Collection)output).isEmpty();
        Assertions.assertThat((List)gateway.getEventsSent()).hasSize(1);
        OperatorEvent event = (OperatorEvent)gateway.getEventsSent().get(0);
        Assertions.assertThat((Object)event).isInstanceOf(SourceEventWrapper.class);
        SourceEvent dynamicFilteringEvent = ((SourceEventWrapper)event).getSourceEvent();
        Assertions.assertThat((Object)dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
        DynamicFilteringData data = ((DynamicFilteringEvent)dynamicFilteringEvent).getData();
        Assertions.assertThat((boolean)data.isFiltering()).isTrue();
        Assertions.assertThat((Collection)data.getData()).hasSize(2);
        Assertions.assertThat((boolean)data.contains(this.rowData(1, 1L, "a"))).isTrue();
        Assertions.assertThat((boolean)data.contains(this.rowData(2, 1L, null))).isTrue();
    }

    @Test
    void testExceedsThreshold() throws Exception {
        RowType rowType = RowType.of((LogicalType[])new LogicalType[]{new IntType(), new BigIntType(), new VarCharType()});
        List<Integer> indexes = Arrays.asList(0, 1, 3);
        MockOperatorEventGateway gateway = new MockOperatorEventGateway();
        int thresholds = 100;
        DynamicFilteringDataCollectorOperator operator = new DynamicFilteringDataCollectorOperator(null, rowType, indexes, (long)thresholds, (OperatorEventGateway)gateway);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)operator);){
            harness.setup();
            harness.open();
            harness.processElement((Object)this.rowData(1, 1L, 0, "a"), 1L);
            harness.processElement((Object)this.rowData(2, 1L, 0, "b"), 2L);
            harness.processElement((Object)this.rowData(3, 1L, 0, "c"), 3L);
        }
        Assertions.assertThat((List)gateway.getEventsSent()).hasSize(1);
        OperatorEvent event = (OperatorEvent)gateway.getEventsSent().get(0);
        Assertions.assertThat((Object)event).isInstanceOf(SourceEventWrapper.class);
        SourceEvent dynamicFilteringEvent = ((SourceEventWrapper)event).getSourceEvent();
        Assertions.assertThat((Object)dynamicFilteringEvent).isInstanceOf(DynamicFilteringEvent.class);
        DynamicFilteringData data = ((DynamicFilteringEvent)dynamicFilteringEvent).getData();
        Assertions.assertThat((boolean)data.isFiltering()).isFalse();
    }

    private RowData rowData(Object ... values) {
        GenericRowData rowData = new GenericRowData(values.length);
        for (int i = 0; i < values.length; ++i) {
            Object value = values[i];
            value = value instanceof String ? new BinaryStringData((String)value) : value;
            rowData.setField(i, value);
        }
        return rowData;
    }
}

