/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.runtimefilter;

import java.util.Queue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.operators.util.BloomFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.runtimefilter.GlobalRuntimeFilterBuilderOperator;
import org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.BinaryType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;

class GlobalRuntimeFilterBuilderOperatorTest {
    GlobalRuntimeFilterBuilderOperatorTest() {
    }

    @Test
    void testNormalInputAndNormalOutput() throws Exception {
        try (StreamTaskMailboxTestHarness<RowData> testHarness = GlobalRuntimeFilterBuilderOperatorTest.createGlobalRuntimeFilterBuilderOperatorHarness(10);){
            testHarness.processElement((Object)new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5, BloomFilter.toBytes((BloomFilter)GlobalRuntimeFilterBuilderOperatorTest.createBloomFilter1())})));
            testHarness.processElement((Object)new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5, BloomFilter.toBytes((BloomFilter)GlobalRuntimeFilterBuilderOperatorTest.createBloomFilter2())})));
            testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0);
            Queue outputs = testHarness.getOutput();
            AssertionsForClassTypes.assertThat((int)outputs.size()).isEqualTo(1);
            RowData outputRowData = (RowData)((StreamRecord)outputs.poll()).getValue();
            AssertionsForClassTypes.assertThat((int)outputRowData.getArity()).isEqualTo(2);
            int globalCount = outputRowData.getInt(0);
            BloomFilter globalBloomFilter = BloomFilter.fromBytes((byte[])outputRowData.getBinary(1));
            AssertionsForClassTypes.assertThat((int)globalCount).isEqualTo(10);
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var1".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var2".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var3".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var4".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var5".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var6".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var7".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var8".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var9".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var10".hashCode())).isTrue();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var11".hashCode())).isFalse();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var12".hashCode())).isFalse();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var13".hashCode())).isFalse();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var14".hashCode())).isFalse();
            AssertionsForClassTypes.assertThat((boolean)globalBloomFilter.testHash("var15".hashCode())).isFalse();
        }
    }

    @Test
    void testNormalInputAndOverMaxRowCountOutput() throws Exception {
        try (StreamTaskMailboxTestHarness<RowData> testHarness = GlobalRuntimeFilterBuilderOperatorTest.createGlobalRuntimeFilterBuilderOperatorHarness(9);){
            testHarness.processElement((Object)new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5, BloomFilter.toBytes((BloomFilter)GlobalRuntimeFilterBuilderOperatorTest.createBloomFilter1())})));
            testHarness.processElement((Object)new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5, BloomFilter.toBytes((BloomFilter)GlobalRuntimeFilterBuilderOperatorTest.createBloomFilter2())})));
            testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0);
            Queue outputs = testHarness.getOutput();
            AssertionsForClassTypes.assertThat((int)outputs.size()).isEqualTo(1);
            RowData outputRowData = (RowData)((StreamRecord)outputs.poll()).getValue();
            AssertionsForClassTypes.assertThat((int)outputRowData.getArity()).isEqualTo(2);
            int globalCount = outputRowData.getInt(0);
            AssertionsForClassTypes.assertThat((int)globalCount).isEqualTo(-1);
            AssertionsForClassTypes.assertThat((boolean)outputRowData.isNullAt(1)).isTrue();
        }
    }

    @Test
    void testOverMaxRowCountInput() throws Exception {
        try (StreamTaskMailboxTestHarness<RowData> testHarness = GlobalRuntimeFilterBuilderOperatorTest.createGlobalRuntimeFilterBuilderOperatorHarness(10);){
            testHarness.processElement((Object)new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{5, BloomFilter.toBytes((BloomFilter)GlobalRuntimeFilterBuilderOperatorTest.createBloomFilter1())})));
            testHarness.processElement((Object)new StreamRecord((Object)GenericRowData.of((Object[])new Object[]{-1, null})));
            testHarness.processEvent((AbstractEvent)new EndOfData(StopMode.DRAIN), 0);
            Queue outputs = testHarness.getOutput();
            AssertionsForClassTypes.assertThat((int)outputs.size()).isEqualTo(1);
            RowData outputRowData = (RowData)((StreamRecord)outputs.poll()).getValue();
            AssertionsForClassTypes.assertThat((int)outputRowData.getArity()).isEqualTo(2);
            int globalCount = outputRowData.getInt(0);
            AssertionsForClassTypes.assertThat((int)globalCount).isEqualTo(-1);
            AssertionsForClassTypes.assertThat((boolean)outputRowData.isNullAt(1)).isTrue();
        }
    }

    private static BloomFilter createBloomFilter1() {
        BloomFilter bloomFilter1 = RuntimeFilterUtils.createOnHeapBloomFilter((int)10);
        bloomFilter1.addHash("var1".hashCode());
        bloomFilter1.addHash("var2".hashCode());
        bloomFilter1.addHash("var3".hashCode());
        bloomFilter1.addHash("var4".hashCode());
        bloomFilter1.addHash("var5".hashCode());
        return bloomFilter1;
    }

    private static BloomFilter createBloomFilter2() {
        BloomFilter bloomFilter2 = RuntimeFilterUtils.createOnHeapBloomFilter((int)10);
        bloomFilter2.addHash("var6".hashCode());
        bloomFilter2.addHash("var7".hashCode());
        bloomFilter2.addHash("var8".hashCode());
        bloomFilter2.addHash("var9".hashCode());
        bloomFilter2.addHash("var10".hashCode());
        return bloomFilter2;
    }

    private static StreamTaskMailboxTestHarness<RowData> createGlobalRuntimeFilterBuilderOperatorHarness(int maxRowCount) throws Exception {
        GlobalRuntimeFilterBuilderOperator operator = new GlobalRuntimeFilterBuilderOperator(maxRowCount);
        return new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, (TypeInformation)InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), new BinaryType()})).setupOutputForSingletonOperatorChain((StreamOperator)operator).addInput((TypeInformation)InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), new BinaryType()})).build();
    }
}

