/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.api.bridge.scala.package$;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.FailingCollectionSource;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.math.Ordering;
import scala.reflect.ScalaSignature;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001i4A!\u0001\u0002\u0001'\t\u0001r+\u001b8e_^\u0014\u0016M\\6J)\u000e\u000b7/\u001a\u0006\u0003\u0007\u0011\t1a]9m\u0015\t)a!\u0001\u0004tiJ,\u0017-\u001c\u0006\u0003\u000f!\tqA];oi&lWM\u0003\u0002\n\u0015\u00059\u0001\u000f\\1o]\u0016\u0014(BA\u0006\r\u0003\u0015!\u0018M\u00197f\u0015\tia\"A\u0003gY&t7N\u0003\u0002\u0010!\u00051\u0011\r]1dQ\u0016T\u0011!E\u0001\u0004_J<7\u0001A\n\u0003\u0001Q\u0001\"!\u0006\r\u000e\u0003YQ!a\u0006\u0004\u0002\u000bU$\u0018\u000e\\:\n\u0005e1\"AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0007\u0002C\u000e\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u000f\u0002\t5|G-\u001a\t\u0003;Er!AH\u0018\u000f\u0005}qcB\u0001\u0011.\u001d\t\tCF\u0004\u0002#W9\u00111E\u000b\b\u0003I%r!!\n\u0015\u000e\u0003\u0019R!a\n\n\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012BA\b\u0011\u0013\tia\"\u0003\u0002\f\u0019%\u0011\u0011BC\u0005\u0003\u000f!I!a\u0006\u0004\n\u0005A2\u0012AG*ue\u0016\fW.\u001b8h/&$\bn\u0015;bi\u0016$Vm\u001d;CCN,\u0017B\u0001\u001a4\u0005A\u0019F/\u0019;f\u0005\u0006\u001c7.\u001a8e\u001b>$WM\u0003\u00021-!)Q\u0007\u0001C\u0001m\u00051A(\u001b8jiz\"\"aN\u001d\u0011\u0005a\u0002Q\"\u0001\u0002\t\u000bm!\u0004\u0019\u0001\u000f\t\u000bm\u0002A\u0011\t\u001f\u0002\r\t,gm\u001c:f)\u0005i\u0004C\u0001 B\u001b\u0005y$\"\u0001!\u0002\u000bM\u001c\u0017\r\\1\n\u0005\t{$\u0001B+oSRD#A\u000f#\u0011\u0005\u0015CU\"\u0001$\u000b\u0005\u001d\u0003\u0012!\u00026v]&$\u0018BA%G\u0005\u0019\u0011UMZ8sK\")1\n\u0001C\u0001y\u0005IB/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V/\u001c2mK^Kg\u000eZ8xQ\tQU\n\u0005\u0002F\u001d&\u0011qJ\u0012\u0002\u0005)\u0016\u001cH\u000fC\u0003R\u0001\u0011\u0005A(A\u0014uKN$XI^3oiRKW.\u001a+v[\ndWmV5oI><x+\u001b;i%\u0006t7n\u00144gg\u0016$\bF\u0001)N\u0011\u0015!\u0006\u0001\"\u0001=\u0003)\"Xm\u001d;Fm\u0016tG\u000fV5nKR+XN\u00197f/&tGm\\<XSRDw.\u001e;SC:\\g*^7cKJD#aU'\t\u000b]\u0003A\u0011\u0001\u001f\u0002-Q,7\u000f^#wK:$H+[7f\u0011>\u0004x+\u001b8e_^D#AV'\t\u000bi\u0003A\u0011\u0001\u001f\u0002IQ,7\u000f^#wK:$H+[7f\u0011>\u0004x+\u001b8e_^<\u0016\u000e\u001e5SC:\\wJ\u001a4tKRD#!W'\t\u000bu\u0003A\u0011\u0001\u001f\u0002OQ,7\u000f^#wK:$H+[7f\u0011>\u0004x+\u001b8e_^<\u0016\u000e\u001e5pkR\u0014\u0016M\\6Ok6\u0014WM\u001d\u0015\u000396CQ\u0001\u0019\u0001\u0005\u0002q\n1\u0004^3ti\u00163XM\u001c;US6,7)^7vY\u0006$XmV5oI><\bFA0N\u0011\u0015\u0019\u0007\u0001\"\u0001=\u0003%\"Xm\u001d;Fm\u0016tG\u000fV5nK\u000e+X.\u001e7bi\u0016<\u0016N\u001c3po^KG\u000f\u001b*b].|eMZ:fi\"\u0012!-\u0014\u0005\u0006M\u0002!\t\u0001P\u0001-i\u0016\u001cH/\u0012<f]R$\u0016.\\3Dk6,H.\u0019;f/&tGm\\<XSRDw.\u001e;SC:\\g*^7cKJD#!Z'\t\u000b%\u0004A\u0011\u0001\u001f\u0002\u0011Q,7\u000f\u001e+paFB#\u0001[')\t\u0001a'o\u001d\t\u0003[Bl\u0011A\u001c\u0006\u0003_\u001a\u000baA];o]\u0016\u0014\u0018BA9o\u0005\u001d\u0011VO\\,ji\"\fQA^1mk\u0016\u001c\u0013\u0001\u001e\t\u0003kbl\u0011A\u001e\u0006\u0003o\u001a\u000bqA];o]\u0016\u00148/\u0003\u0002zm\ni\u0001+\u0019:b[\u0016$XM]5{K\u0012\u0004")
public class WindowRankITCase
extends StreamingWithStateTestBase {
    @Override
    @Before
    public void before() {
        super.before();
        this.env().enableCheckpointing(100L, CheckpointingMode.EXACTLY_ONCE);
        this.env().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        FailingCollectionSource.reset();
        String dataId = TestValuesTableFactory.registerData(TestData$.MODULE$.windowDataWithTimestamp());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n        |CREATE TABLE T1 (\n        | `ts` STRING,\n        | `int` INT,\n        | `double` DOUBLE,\n        | `float` FLOAT,\n        | `bigdec` DECIMAL(10, 2),\n        | `string` STRING,\n        | `name` STRING,\n        | `rowtime` AS TO_TIMESTAMP(`ts`),\n        | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '1' SECOND\n        |) WITH (\n        | 'connector' = 'values',\n        | 'data-id' = '", "',\n        | 'failing-source' = 'true'\n        |)\n        |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{dataId})))).stripMargin());
        this.tEnv().createFunction("concat_distinct_agg", JavaUserDefinedAggFunctions.ConcatDistinctAggFunction.class);
    }

    @Test
    public void testEventTimeTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeTumbleWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeTumbleWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:10,1,3.33,null,3.0,1,Comment#2", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeHopWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-09T23:59:55,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Comment#2|Hi|Comment#1,1", "a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:05,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,1", "b,2020-10-10T00:00:10,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:25,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeHopWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT *\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2,2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeHopWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |  FROM TABLE(\n        |    HOP(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND, INTERVAL '10' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00:05,2020-10-10T00:00:15,1,3.33,null,3.0,1,Comment#2", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:25,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeCumulateWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,5.0,1.0,2,Hi|Comment#1,1", "a,2020-10-10T00:00,2020-10-10T00:00:10,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2,1", "a,2020-10-10T00:00,2020-10-10T00:00:15,6,19.98,5.0,1.0,3,Hi|Comment#1|Comment#2,1", "b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:25,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:30,1,4.44,4.0,4.0,1,Hi,1", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3,2", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0,null,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0,null,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeCumulateWindowWithRankOffset() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3,2", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3,2"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testEventTimeCumulateWindowWithoutRankNumber() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT `name`, window_start, window_end, cnt, sum_b, max_d, min_f, uv, distinct_str\n        |FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b,\n        |      MAX(`double`) as max_d,\n        |      MIN(`float`) as min_f,\n        |      COUNT(DISTINCT `string`) as uv,\n        |      concat_distinct_agg(`string`) as distinct_str\n        |    FROM TABLE(\n        |      CUMULATE(\n        |        TABLE T1,\n        |        DESCRIPTOR(rowtime),\n        |        INTERVAL '5' SECOND,\n        |        INTERVAL '15' SECOND))\n        |    GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum > 1 AND rownum <= 2\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"b,2020-10-10T00:00,2020-10-10T00:00:10,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00,2020-10-10T00:00:15,2,6.66,6.0,3.0,2,Hello|Hi", "b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1,Comment#3", "b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1,Comment#3"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testTop1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT * FROM\n        |(\n        |  SELECT *,\n        |    ROW_NUMBER() OVER(\n        |      PARTITION BY window_start, window_end ORDER BY sum_b DESC) as rownum\n        |  FROM (\n        |    SELECT\n        |      `name`,\n        |      window_start,\n        |      window_end,\n        |      COUNT(*) as cnt,\n        |      SUM(`bigdec`) as sum_b\n        |    FROM TABLE(\n        |      TUMBLE(TABLE T1, DESCRIPTOR(rowtime), INTERVAL '5' SECOND))\n        |  GROUP BY `name`, window_start, window_end\n        |  )\n        |)\n        |WHERE rownum <= 1\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"a,2020-10-10T00:00,2020-10-10T00:00:05,4,11.10,1", "b,2020-10-10T00:00:05,2020-10-10T00:00:10,2,6.66,1", "b,2020-10-10T00:00:15,2020-10-10T00:00:20,1,4.44,1", "null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,1"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    public WindowRankITCase(StreamingWithStateTestBase.StateBackendMode mode) {
        super(mode);
    }
}

