/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.plan.stream.sql.agg;

import java.util.Collection;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.planner.plan.stream.sql.agg.WindowAggregateTest$;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
import org.apache.flink.table.planner.utils.AggregatePhaseStrategy;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.reflect.ScalaSignature;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\t]a\u0001B\u0001\u0003\u0001U\u00111cV5oI><\u0018iZ4sK\u001e\fG/\u001a+fgRT!a\u0001\u0003\u0002\u0007\u0005<wM\u0003\u0002\u0006\r\u0005\u00191/\u001d7\u000b\u0005\u001dA\u0011AB:ue\u0016\fWN\u0003\u0002\n\u0015\u0005!\u0001\u000f\\1o\u0015\tYA\"A\u0004qY\u0006tg.\u001a:\u000b\u00055q\u0011!\u0002;bE2,'BA\b\u0011\u0003\u00151G.\u001b8l\u0015\t\t\"#\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002'\u0005\u0019qN]4\u0004\u0001M\u0011\u0001A\u0006\t\u0003/ii\u0011\u0001\u0007\u0006\u00033)\tQ!\u001e;jYNL!a\u0007\r\u0003\u001bQ\u000b'\r\\3UKN$()Y:f\u0011!i\u0002A!A!\u0002\u0013q\u0012\u0001E1hOBC\u0017m]3F]\u001a|'oY3s!\t9r$\u0003\u0002!1\t1\u0012iZ4sK\u001e\fG/\u001a)iCN,7\u000b\u001e:bi\u0016<\u0017\u0010C\u0003#\u0001\u0011\u00051%\u0001\u0004=S:LGO\u0010\u000b\u0003I\u0019\u0002\"!\n\u0001\u000e\u0003\tAQ!H\u0011A\u0002yAq\u0001\u000b\u0001C\u0002\u0013%\u0011&\u0001\u0003vi&dW#\u0001\u0016\u0011\u0005]Y\u0013B\u0001\u0017\u0019\u0005M\u0019FO]3b[R\u000b'\r\\3UKN$X\u000b^5m\u0011\u0019q\u0003\u0001)A\u0005U\u0005)Q\u000f^5mA!9\u0001\u0007\u0001b\u0001\n\u0013\t\u0014AC5t)^|\u0007\u000b[1tKV\t!\u0007\u0005\u00024m5\tAGC\u00016\u0003\u0015\u00198-\u00197b\u0013\t9DGA\u0004C_>dW-\u00198\t\re\u0002\u0001\u0015!\u00033\u0003-I7\u000fV<p!\"\f7/\u001a\u0011\t\u000bm\u0002A\u0011\u0001\u001f\u0002\r\t,gm\u001c:f)\u0005i\u0004CA\u001a?\u0013\tyDG\u0001\u0003V]&$\bF\u0001\u001eB!\t\u0011U)D\u0001D\u0015\t!%#A\u0003kk:LG/\u0003\u0002G\u0007\n1!)\u001a4pe\u0016DQ\u0001\u0013\u0001\u0005\u0002q\nA\u0003^3tiR+XN\u00197f?>s'k\\<uS6,\u0007FA$K!\t\u00115*\u0003\u0002M\u0007\n!A+Z:u\u0011\u0015q\u0005\u0001\"\u0001=\u0003U!Xm\u001d;Uk6\u0014G.Z0P]B\u0013xn\u0019;j[\u0016D#!\u0014&\t\u000bE\u0003A\u0011\u0001\u001f\u0002)Q,7\u000f\u001e+v[\ndWmX\"bY\u000e|e\u000e\u0016,GQ\t\u0001&\nC\u0003U\u0001\u0011\u0005A(A\u000fuKN$H+^7cY\u0016|v+\u001b8e_^\u001cu\u000e\\;n]N\fE/\u00128eQ\t\u0019&\nC\u0003X\u0001\u0011\u0005A(A\u0013uKN$H+^7cY\u0016|vI]8va6+H\u000e^5qY\u0016<\u0016N\u001c3po\u000e{G.^7og\"\u0012aK\u0013\u0005\u00065\u0002!\t\u0001P\u0001\u001di\u0016\u001cH\u000fV;nE2,wl\u0012:pkBlU\u000f\u001c;ja2,7*Z=tQ\tI&\nC\u0003^\u0001\u0011\u0005A(A\u0011uKN$H+^7cY\u0016|vI]8va>sG._,j]\u0012|woQ8mk6t7\u000f\u000b\u0002]\u0015\")\u0001\r\u0001C\u0001y\u0005qB/Z:u)Vl'\r\\3`\u000fJ|W\u000f](o\u0019&$XM]1m-\u0006dW/\u001a\u0015\u0003?*CQa\u0019\u0001\u0005\u0002q\nQ\u0004^3tiR+XN\u00197f?B\u0013xN[3di&|g\u000eU;tQ\u0012{wO\u001c\u0015\u0003E*CQA\u001a\u0001\u0005\u0002q\n!\u0004^3tiR+XN\u00197f?\u000e\u000b7oY1eS:<w+\u001b8e_^D#!\u001a&\t\u000b%\u0004A\u0011\u0001\u001f\u0002?Q,7\u000f\u001e+v[\ndWm\u0018#jgRLgn\u0019;Ta2LG/\u00128bE2,G\r\u000b\u0002i\u0015\")A\u000e\u0001C\u0001y\u0005\u0011C/Z:u)Vl'\r\\3`\t&\u001cH/\u001b8di>sw+\u001b8e_^\u001cu\u000e\\;n]ND#a\u001b&\t\u000b=\u0004A\u0011\u0001\u001f\u0002SQ,7\u000f\u001e+v[\ndWm\u0018#p\u001d>$8\u000b\u001d7jiB\u0013xnY3tg&tw\rV5nK^Kg\u000eZ8xQ\tq'\nC\u0003s\u0001\u0011\u0005A(A\u0011uKN$H+^7cY\u0016|fj\u001c;PkR\u0004X\u000f^,j]\u0012|woQ8mk6t7\u000f\u000b\u0002r\u0015\")Q\u000f\u0001C\u0001y\u0005YB/Z:u)Vl'\r\\3`+\u0012\fgmV5uQ>,H/T3sO\u0016D#\u0001\u001e&\t\u000ba\u0004A\u0011\u0001\u001f\u0002-Q,7\u000f^\"v[Vd\u0017\r^3`\u001f:\u0014vn\u001e;j[\u0016D#a\u001e&\t\u000bm\u0004A\u0011\u0001\u001f\u0002/Q,7\u000f^\"v[Vd\u0017\r^3`\u001f:\u0004&o\\2uS6,\u0007F\u0001>K\u0011\u0015q\b\u0001\"\u0001=\u0003\u0005\"Xm\u001d;Dk6,H.\u0019;f?\u0012K7\u000f^5oGR\u001c\u0006\u000f\\5u\u000b:\f'\r\\3eQ\ti(\n\u0003\u0004\u0002\u0004\u0001!\t\u0001P\u0001\u0012i\u0016\u001cH\u000fS8q?>s'k\\<uS6,\u0007fAA\u0001\u0015\"1\u0011\u0011\u0002\u0001\u0005\u0002q\n!\u0003^3ti\"{\u0007oX(o!J|7\r^5nK\"\u001a\u0011q\u0001&\t\r\u0005=\u0001\u0001\"\u0001=\u0003q!Xm\u001d;I_B|F)[:uS:\u001cGo\u00159mSR,e.\u00192mK\u0012D3!!\u0004K\u0011\u0019\t)\u0002\u0001C\u0001y\u0005!C/Z:u\u001bVdG/\u001b9mK\u0006;wM]3hCR,wJ\\*b[\u0016<\u0016N\u001c3poR3f\tK\u0002\u0002\u0014)Ca!a\u0007\u0001\t\u0003a\u0014A\u000b;fgR\u001c\u0015M\u001c;NKJ<WmV5oI><HK\u0016$`\r&dG/\u001a:P]^Kg\u000eZ8x'R\f'\u000f\u001e\u0015\u0004\u00033Q\u0005BBA\u0011\u0001\u0011\u0005A(\u0001\u0014uKN$8)\u00198u\u001b\u0016\u0014x-Z,j]\u0012|w\u000f\u0016,G?V#GOZ(o/&tGm\\<U-\u001aC3!a\bK\u0011\u0019\t9\u0003\u0001C\u0001y\u0005iC/Z:u\u0007\u0006tG\u000f\u0016:b]Nd\u0017\r^3U_^Kg\u000eZ8x\u0003\u001e<wl\u0012:pkB|en\u00148msN#\u0018M\u001d;)\u0007\u0005\u0015\"\n\u0003\u0004\u0002.\u0001!\t\u0001P\u00011i\u0016\u001cHoQ1oiR\u0013\u0018M\\:mCR,Gk\\,j]\u0012|w/Q4h?BKH\u000f[8o\u0003\u001e<'/Z4bi\u0016\u001c\u0015\r\u001c7)\u0007\u0005-\"\n\u0003\u0004\u00024\u0001!\t\u0001P\u0001#i\u0016\u001cH/\u00168tkB\u0004xN\u001d;fI\u0016C8-\u001a9uS>tw,R1sYf4\u0015N]3)\u0007\u0005E\"\n\u0003\u0004\u0002:\u0001!\t\u0001P\u0001\"i\u0016\u001cH/\u00168tkB\u0004xN\u001d;fI\u0016C8-\u001a9uS>tw\fT1uK\u001aK'/\u001a\u0015\u0004\u0003oQ\u0005BBA \u0001\u0011\u0005A(\u0001\u0017uKN$XK\\:vaB|'\u000f^3e\u000bb\u001cW\r\u001d;j_:|\u0006j\u001c9TSj,gj\u001c8ESZL7/\u001b2mK\"\u001a\u0011Q\b&\t\r\u0005\u0015\u0003\u0001\"\u0001=\u0003E\"Xm\u001d;V]N,\b\u000f]8si\u0016$W\t_2faRLwN\\0Dk6,H.\u0019;f'&TXMT8o\t&4\u0018n]5cY\u0016D3!a\u0011K\u0011\u0019\tY\u0005\u0001C\u0001y\u0005qD/Z:u\u0007\u0006tG\u000f\u0016:b]Nd\u0017\r^3U_^Kg\u000eZ8x\u0003\u001e<wl\u0012:pkBLgnZ*fiN<\u0016\u000e\u001e5pkR<\u0016N\u001c3poN#\u0018M\u001d;F]\u0012D3!!\u0013K\u0011\u0019\t\t\u0006\u0001C\u0001y\u0005aD/Z:u\u0007\u0006tG\u000f\u0016:b]Nd\u0017\r^3U_^Kg\u000eZ8x\u0003\u001e<wl\u0012:pkBLgnZ*fiN|e\u000e\\=XSRDw+\u001b8e_^\u001cF/\u0019:uQ\r\tyE\u0013\u0005\u0007\u0003/\u0002A\u0011\u0001\u001f\u0002/Q,7\u000f\u001e+v[\ndWmX$s_V\u0004\u0018N\\4TKR\u001c\bfAA+\u0015\"1\u0011Q\f\u0001\u0005\u0002q\n\u0001\u0004^3tiR+XN\u00197f?\u001e\u0013x.\u001e9j]\u001e\u001cV\r^:2Q\r\tYF\u0013\u0005\u0007\u0003G\u0002A\u0011\u0001\u001f\u0002WQ,7\u000f\u001e+v[\ndWmX$s_V\u0004\u0018N\\4TKR\u001cH)[:uS:\u001cGo\u00159mSR,e.\u00192mK\u0012D3!!\u0019K\u0011\u0019\tI\u0007\u0001C\u0001y\u00051D/Z:u\u0007\u0006tG\u000f\u0016:b]Nd\u0017\r^3U_^Kg\u000eZ8x\u0003\u001e<wlQ;cK^KG\u000f[8vi^Kg\u000eZ8x'R\f'\u000f^#oI\"\u001a\u0011q\r&\t\r\u0005=\u0004\u0001\"\u0001=\u0003a\"Xm\u001d;DC:$HK]1og2\fG/\u001a+p/&tGm\\<BO\u001e|&k\u001c7mkB<\u0016\u000e\u001e5pkR<\u0016N\u001c3poN#\u0018M\u001d;F]\u0012D3!!\u001cK\u0011\u0019\t)\b\u0001C\u0001y\u0005\tB/Z:u)Vl'\r\\3`%>dG.\u001e9)\u0007\u0005M$\n\u0003\u0004\u0002|\u0001!\t\u0001P\u0001;i\u0016\u001cHoQ1oi6+'oZ3XS:$wn\u001e+W\r~;%o\\;qS:<7+\u001a;t\t&\u001cH/\u001b8di>sw+\u001b8e_^\u001cu\u000e\\;n]ND3!!\u001fK\u0011\u0019\t\t\t\u0001C\u0001y\u0005!B/Z:u\u0011>\u0004xl\u0012:pkBLgnZ*fiND3!a K\u0011\u0019\t9\t\u0001C\u0001y\u0005IC/Z:u\u0011>\u0004xl\u0012:pkBLgnZ*fiN|F)[:uS:\u001cGo\u00159mSR,e.\u00192mK\u0012D3!!\"K\u0011\u0019\ti\t\u0001C\u0001y\u0005aA/Z:u\u0011>\u0004xlQ;cK\"\u001a\u00111\u0012&\t\r\u0005M\u0005\u0001\"\u0001=\u00039!Xm\u001d;I_B|&k\u001c7mkBD3!!%K\u0011\u0019\tI\n\u0001C\u0001y\u0005IB/Z:u\u0007VlW\u000f\\1uK~;%o\\;qS:<7+\u001a;tQ\r\t9J\u0013\u0005\u0007\u0003?\u0003A\u0011\u0001\u001f\u0002]Q,7\u000f^\"v[Vd\u0017\r^3`\u000fJ|W\u000f]5oON+Go]0ESN$\u0018N\\2u'Bd\u0017\u000e^#oC\ndW\r\u001a\u0015\u0004\u0003;S\u0005BBAS\u0001\u0011\u0005A(A\tuKN$8)^7vY\u0006$XmX\"vE\u0016D3!a)K\u0011\u0019\tY\u000b\u0001C\u0001y\u0005\u0019B/Z:u\u0007VlW\u000f\\1uK~\u0013v\u000e\u001c7va\"\u001a\u0011\u0011\u0016&)\u000f\u0001\t\t,!0\u0002@B!\u00111WA]\u001b\t\t)LC\u0002\u00028\u000e\u000baA];o]\u0016\u0014\u0018\u0002BA^\u0003k\u0013qAU;o/&$\b.A\u0003wC2,Xm\t\u0002\u0002BB!\u00111YAe\u001b\t\t)MC\u0002\u0002H\u000e\u000bqA];o]\u0016\u00148/\u0003\u0003\u0002L\u0006\u0015'!\u0004)be\u0006lW\r^3sSj,GmB\u0004\u0002P\nA\t!!5\u0002']Kg\u000eZ8x\u0003\u001e<'/Z4bi\u0016$Vm\u001d;\u0011\u0007\u0015\n\u0019N\u0002\u0004\u0002\u0005!\u0005\u0011Q[\n\u0005\u0003'\f9\u000eE\u00024\u00033L1!a75\u0005\u0019\te.\u001f*fM\"9!%a5\u0005\u0002\u0005}GCAAi\u0011!\t\u0019/a5\u0005\u0002\u0005\u0015\u0018A\u00039be\u0006lW\r^3sgR\u0011\u0011q\u001d\t\u0007\u0003S\f\t0!>\u000e\u0005\u0005-(b\u0001\u0015\u0002n*\u0011\u0011q^\u0001\u0005U\u00064\u0018-\u0003\u0003\u0002t\u0006-(AC\"pY2,7\r^5p]B)1'a>\u0002|&\u0019\u0011\u0011 \u001b\u0003\u000b\u0005\u0013(/Y=\u0011\u0007M\ni0C\u0002\u0002\u0000R\u00121!\u00118zQ!\t\tOa\u0001\u0003\u0012\tM\u0001\u0003\u0002B\u0003\u0005\u0017qA!a1\u0003\b%!!\u0011BAc\u00035\u0001\u0016M]1nKR,'/\u001b>fI&!!Q\u0002B\b\u0005)\u0001\u0016M]1nKR,'o\u001d\u0006\u0005\u0005\u0013\t)-\u0001\u0003oC6,\u0017E\u0001B\u000b\u0003Q\twm\u001a)iCN,WI\u001c4pe\u000e,'/P>1{\u0002")
public class WindowAggregateTest
extends TableTestBase {
    private final AggregatePhaseStrategy aggPhaseEnforcer;
    private final StreamTableTestUtil util;
    private final boolean isTwoPhase;

    @Parameterized.Parameters(name="aggPhaseEnforcer={0}")
    public static Collection<Object[]> parameters() {
        return WindowAggregateTest$.MODULE$.parameters();
    }

    private StreamTableTestUtil util() {
        return this.util;
    }

    private boolean isTwoPhase() {
        return this.isTwoPhase;
    }

    @Before
    public void before() {
        this.util().addTemporarySystemFunction("weightedAvg", JavaUserDefinedAggFunctions.WeightedAvgWithMerge.class);
        this.util().addTemporarySystemFunction("weightedAvgWithoutMerge", JavaUserDefinedAggFunctions.WeightedAvg.class);
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE MyTable (\n         |  a INT,\n         |  b BIGINT,\n         |  c STRING NOT NULL,\n         |  d DECIMAL(10, 3),\n         |  e BIGINT,\n         |  rowtime TIMESTAMP(3),\n         |  proctime as PROCTIME(),\n         |  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND\n         |) with (\n         |  'connector' = 'values'\n         |)\n         |"})).s((Seq)Nil$.MODULE$))).stripMargin());
        this.util().tableEnv().getConfig().getConfiguration().setString(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, this.aggPhaseEnforcer.toString());
    }

    @Test
    public void testTumble_OnRowtime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_OnProctime() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_CalcOnTVF() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM (\n        |  SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, a\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE b > 1000\n        |)\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_WindowColumnsAtEnd() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv,\n        |   window_start,\n        |   window_end\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupMultipleWindowColumns() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   ws,\n        |   window_end,\n        |   window_time,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM (\n        |  SELECT *, window_start as ws\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |)\n        |GROUP BY a, window_start, window_end, ws, window_time\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupMultipleKeys() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY window_start, a, window_end, b\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupOnlyWindowColumns() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupOnLiteralValue() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY 'literal', window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_ProjectionPushDown() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d)\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_CascadingWindow() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW window1 AS\n        |SELECT\n        |   a,\n        |   b,\n        |   window_time as rowtime,\n        |   count(*) as cnt,\n        |   sum(d) as sum_d,\n        |   max(d) as max_d\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE))\n        |GROUP BY a, window_start, window_end, window_time, b\n      ")).stripMargin());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  a,\n        |  window_start,\n        |  window_end,\n        |  sum(cnt),\n        |  sum(sum_d),\n        |  max(max_d)\n        |FROM TABLE(TUMBLE(TABLE window1, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE))\n        |GROUP BY a, window_start, window_end\n        |")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_DistinctSplitEnabled() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_DistinctOnWindowColumns() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct window_time) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_DoNotSplitProcessingTimeWindow() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_NotOutputWindowColumns() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_UdafWithoutMerge() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvgWithoutMerge(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_OnRowtime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_OnProctime() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_DistinctSplitEnabled() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_OnRowtime() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_OnProctime() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |   HOP(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_DistinctSplitEnabled() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testMultipleAggregateOnSameWindowTVF() {
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE VIEW tvf AS\n        |SELECT * FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |")).stripMargin());
        StatementSet statementSet = this.util().tableEnv().createStatementSet();
        this.util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |CREATE TABLE s1 (\n        |  wstart TIMESTAMP(3),\n        |  wend TIMESTAMP(3),\n        |  `result` BIGINT\n        |) WITH (\n        |  'connector' = 'values'\n        |)\n        |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO s1\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   weightedAvg(b, e) AS wAvg\n        |FROM tvf\n        |GROUP BY window_start, window_end\n        |")).stripMargin());
        statementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO s1\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   count(*)\n        |FROM tvf\n        |GROUP BY window_start, window_end\n        |")).stripMargin());
        this.util().verifyExecPlan(statementSet);
    }

    @Test
    public void testCantMergeWindowTVF_FilterOnWindowStart() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM (\n        |  SELECT window_start, rowtime, d, proctime, e, b, c, window_end, window_time, a\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |  WHERE window_start >= TIMESTAMP '2021-01-01 10:10:00.000'\n        |)\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantMergeWindowTVF_UdtfOnWindowTVF() {
        this.util().tableEnv().createTemporaryFunction("len_udtf", JavaUserDefinedTableFunctions.JavaTableFunc1.class);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(len),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM (\n        |  SELECT *\n        |  FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)),\n        |  LATERAL TABLE(len_udtf(c)) AS T(len)\n        |)\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantTranslateToWindowAgg_GroupOnOnlyStart() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   window_start,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY window_start\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantTranslateToWindowAgg_PythonAggregateCall() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        this.util().tableEnv().createTemporaryFunction("python_agg", JavaUserDefinedAggFunctions.TestPythonAggregateFunction.class);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   window_start,\n        |   window_end,\n        |   python_agg(1, 1)\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testUnsupportedException_EarlyFire() {
        Configuration conf = new Configuration();
        conf.setString(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED().key(), "true");
        conf.setString(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_EARLY_FIRE_DELAY().key(), "5s");
        this.util().tableEnv().getConfig().addConfiguration(conf);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Currently, window table function based aggregate doesn't support early-fire and late-fire configuration 'table.exec.emit.early-fire.enabled' and 'table.exec.emit.late-fire.enabled'.");
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testUnsupportedException_LateFire() {
        Configuration conf = new Configuration();
        conf.setString(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_ENABLED().key(), "true");
        conf.setString(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY().key(), "5s");
        this.util().tableEnv().getConfig().addConfiguration(conf);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   weightedAvg(b, e) AS wAvg,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("Currently, window table function based aggregate doesn't support early-fire and late-fire configuration 'table.exec.emit.early-fire.enabled' and 'table.exec.emit.late-fire.enabled'.");
        this.util().verifyExecPlan(sql);
    }

    @Test
    public void testUnsupportedException_HopSizeNonDivisible() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*)\n        |FROM TABLE(\n        |   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '4' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("HOP table function based aggregate requires size must be an integral multiple of slide, but got size 600000 ms and slide 240000 ms");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testUnsupportedException_CumulateSizeNonDivisible() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   window_start,\n        |   window_end,\n        |   count(*)\n        |FROM TABLE(\n        |   CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY a, window_start, window_end\n      ")).stripMargin();
        this.thrown().expect(TableException.class);
        this.thrown().expectMessage("CUMULATE table function based aggregate requires maxSize must be an integral multiple of step, but got maxSize 3600000 ms and step 1500000 ms");
        this.util().verifyExplain(sql);
    }

    @Test
    public void testCantTranslateToWindowAgg_GroupingSetsWithoutWindowStartEnd() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY GROUPING SETS ((a), (window_start), (window_end))\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantTranslateToWindowAgg_GroupingSetsOnlyWithWindowStart() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY GROUPING SETS ((a, window_start), (window_start))\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupingSets() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY GROUPING SETS ((a, window_start, window_end), (b, window_start, window_end))\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupingSets1() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_GroupingSetsDistinctSplitEnabled() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(*),\n        |   sum(d),\n        |   max(d) filter (where b > 1000),\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantTranslateToWindowAgg_CubeWithoutWindowStartEnd() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY CUBE (a, b, window_start, window_end)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantTranslateToWindowAgg_RollupWithoutWindowStartEnd() {
        Assume.assumeTrue((boolean)this.isTwoPhase());
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY ROLLUP (a, b, window_start, window_end)\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testTumble_Rollup() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n        |GROUP BY ROLLUP (a, b), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCantMergeWindowTVF_GroupingSetsDistinctOnWindowColumns() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n      |SELECT\n      |   a,\n      |   b,\n      |   count(*),\n      |   max(d) filter (where b > 1000),\n      |   count(distinct window_time) AS uv\n      |FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))\n      |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_GroupingSets() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |   HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_GroupingSets_DistinctSplitEnabled() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(*),\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_Cube() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY CUBE (a, b), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testHop_Rollup() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '5' MINUTE, INTERVAL '10' MINUTE))\n        |GROUP BY ROLLUP (a, b), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_GroupingSets() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |   CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_GroupingSets_DistinctSplitEnabled() {
        this.util().tableEnv().getConfig().getConfiguration().setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(*),\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY GROUPING SETS ((a), (b)), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_Cube() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY CUBE (a, b), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    @Test
    public void testCumulate_Rollup() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |   a,\n        |   b,\n        |   count(distinct c) AS uv\n        |FROM TABLE(\n        |  CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))\n        |GROUP BY ROLLUP (a, b), window_start, window_end\n      ")).stripMargin();
        this.util().verifyRelPlan(sql);
    }

    public WindowAggregateTest(AggregatePhaseStrategy aggPhaseEnforcer) {
        this.aggPhaseEnforcer = aggPhaseEnforcer;
        this.util = this.streamTestUtil(this.streamTestUtil$default$1());
        AggregatePhaseStrategy aggregatePhaseStrategy = aggPhaseEnforcer;
        AggregatePhaseStrategy aggregatePhaseStrategy2 = AggregatePhaseStrategy.TWO_PHASE;
        this.isTwoPhase = !(aggregatePhaseStrategy != null ? !aggregatePhaseStrategy.equals(aggregatePhaseStrategy2) : aggregatePhaseStrategy2 != null);
    }
}

