/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.runtime.stream.sql;

import java.time.Duration;
import java.time.ZoneId;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.scala.DataStream;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions;
import org.apache.flink.table.planner.plan.utils.WindowEmitStrategy$;
import org.apache.flink.table.planner.runtime.stream.sql.GroupWindowITCase$;
import org.apache.flink.table.planner.runtime.utils.StreamingWithStateTestBase;
import org.apache.flink.table.planner.runtime.utils.TestData$;
import org.apache.flink.table.planner.runtime.utils.TestingAppendSink;
import org.apache.flink.table.planner.runtime.utils.TestingRetractSink;
import org.apache.flink.table.planner.runtime.utils.TestingUpsertTableSink;
import org.apache.flink.table.planner.runtime.utils.TimeTestUtil;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.types.Row;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Function1;
import scala.Predef$;
import scala.Serializable;
import scala.StringContext;
import scala.Symbol;
import scala.Symbol$;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.IndexedSeq$;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.StringBuilder;
import scala.math.Ordering;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;

@RunWith(value=Parameterized.class)
@ScalaSignature(bytes="\u0006\u0001\u0005\u0005e\u0001B\u0001\u0003\u0001M\u0011\u0011c\u0012:pkB<\u0016N\u001c3po&#6)Y:f\u0015\t\u0019A!A\u0002tc2T!!\u0002\u0004\u0002\rM$(/Z1n\u0015\t9\u0001\"A\u0004sk:$\u0018.\\3\u000b\u0005%Q\u0011a\u00029mC:tWM\u001d\u0006\u0003\u00171\tQ\u0001^1cY\u0016T!!\u0004\b\u0002\u000b\u0019d\u0017N\\6\u000b\u0005=\u0001\u0012AB1qC\u000eDWMC\u0001\u0012\u0003\ry'oZ\u0002\u0001'\t\u0001A\u0003\u0005\u0002\u001615\taC\u0003\u0002\u0018\r\u0005)Q\u000f^5mg&\u0011\u0011D\u0006\u0002\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/\u001a\u0005\t7\u0001\u0011\t\u0011)A\u00059\u0005!Qn\u001c3f!\ti\u0012G\u0004\u0002\u001f_9\u0011qD\f\b\u0003A5r!!\t\u0017\u000f\u0005\tZcBA\u0012+\u001d\t!\u0013F\u0004\u0002&Q5\taE\u0003\u0002(%\u00051AH]8pizJ\u0011!E\u0005\u0003\u001fAI!!\u0004\b\n\u0005-a\u0011BA\u0005\u000b\u0013\t9\u0001\"\u0003\u0002\u0018\r%\u0011\u0001GF\u0001\u001b'R\u0014X-Y7j]\u001e<\u0016\u000e\u001e5Ti\u0006$X\rV3ti\n\u000b7/Z\u0005\u0003eM\u0012\u0001c\u0015;bi\u0016\u0014\u0015mY6f]\u0012lu\u000eZ3\u000b\u0005A2\u0002\u0002C\u001b\u0001\u0005\u0003\u0005\u000b\u0011\u0002\u001c\u0002\u001fU\u001cX\rV5nKN$\u0018-\u001c9Mij\u0004\"a\u000e\u001e\u000e\u0003aR\u0011!O\u0001\u0006g\u000e\fG.Y\u0005\u0003wa\u0012qAQ8pY\u0016\fg\u000eC\u0003>\u0001\u0011\u0005a(\u0001\u0004=S:LGO\u0010\u000b\u0004\u007f\u0005\u0013\u0005C\u0001!\u0001\u001b\u0005\u0011\u0001\"B\u000e=\u0001\u0004a\u0002\"B\u001b=\u0001\u00041\u0004b\u0002#\u0001\u0005\u0004%\t!R\u0001\u000e'\"\u000bej\u0012%B\u0013~SvJT#\u0016\u0003\u0019\u0003\"a\u0012'\u000e\u0003!S!!\u0013&\u0002\tQLW.\u001a\u0006\u0002\u0017\u0006!!.\u0019<b\u0013\ti\u0005J\u0001\u0004[_:,\u0017\n\u001a\u0005\u0007\u001f\u0002\u0001\u000b\u0011\u0002$\u0002\u001dMC\u0015IT$I\u0003&{&l\u0014(FA!)\u0011\u000b\u0001C!%\u00061!-\u001a4pe\u0016$\u0012a\u0015\t\u0003oQK!!\u0016\u001d\u0003\tUs\u0017\u000e\u001e\u0005\u0006/\u0002!\tAU\u0001\u001bi\u0016\u001cH/\u0012<f]R$\u0016.\\3TY&$\u0017N\\4XS:$wn\u001e\u0015\u0003-f\u0003\"AW/\u000e\u0003mS!\u0001\u0018\t\u0002\u000b),h.\u001b;\n\u0005y[&\u0001\u0002+fgRDQ\u0001\u0019\u0001\u0005\u0002I\u000b\u0011\u0004^3ti\u000e\u000b7oY1eS:<G+^7cY\u0016<\u0016N\u001c3po\"\u0012q,\u0017\u0005\u0006G\u0002!\tAU\u0001\u001di\u0016\u001cH/T5o\u001b\u0006Dx+\u001b;i)Vl'\r\\5oO^Kg\u000eZ8xQ\t\u0011\u0017\fC\u0003g\u0001\u0011\u0005!+\u0001\u0012uKN$x+\u001b8e_^\fum\u001a:fO\u0006$Xm\u00148D_:\u001cH/\u00198u-\u0006dW/\u001a\u0015\u0003KfCQ!\u001b\u0001\u0005\u0002)\fA\u0004^3tiB\u0013xn\u0019;j[\u0016\u001c\u0015m]2bI\u0016<\u0016N\u001c3po\u0006;w-F\u0001TQ\tA\u0017\fC\u0003n\u0001\u0011\u0005!+\u0001\u000euKN$XI^3oiRKW.Z*fgNLwN\\,j]\u0012|w\u000f\u000b\u0002m3\")\u0001\u000f\u0001C\u0001%\u0006aC/Z:u\u000bZ,g\u000e\u001e+j[\u0016$V/\u001c2mS:<w+\u001b8e_^<\u0016\u000e\u001e5BY2|w\u000fT1uK:,7o\u001d\u0015\u0003_fCQa\u001d\u0001\u0005\u0002I\u000bQ\u0007^3ti\u0012K7\u000f^5oGR\fumZ,ji\"lUM]4f\u001f:,e/\u001a8u)&lWmU3tg&|gn\u0012:pkB<\u0016N\u001c3po\"\u0012!/\u0017\u0005\u0006m\u0002!Ia^\u0001\u0012o&$\b\u000eT1uK\u001aK'/\u001a#fY\u0006LH\u0003B*y\u0003\u0003AQ!_;A\u0002i\f1\u0002^1cY\u0016\u001cuN\u001c4jOB\u00111P`\u0007\u0002y*\u0011QPC\u0001\u0004CBL\u0017BA@}\u0005-!\u0016M\u00197f\u0007>tg-[4\t\u000f\u0005\rQ\u000f1\u0001\u0002\u0006\u0005A\u0011N\u001c;feZ\fG\u000e\u0005\u0003\u0002\b\u0005EQBAA\u0005\u0015\rI\u00151\u0002\u0006\u0005\u0003\u001b\ty!\u0001\u0004d_6lwN\u001c\u0006\u0003{2IA!a\u0005\u0002\n\t!A+[7fQ\u001d\u0001\u0011qCA\u0012\u0003K\u0001B!!\u0007\u0002 5\u0011\u00111\u0004\u0006\u0004\u0003;Y\u0016A\u0002:v]:,'/\u0003\u0003\u0002\"\u0005m!a\u0002*v]^KG\u000f[\u0001\u0006m\u0006dW/Z\u0012\u0003\u0003O\u0001B!!\u000b\u000205\u0011\u00111\u0006\u0006\u0004\u0003[Y\u0016a\u0002:v]:,'o]\u0005\u0005\u0003c\tYCA\u0007QCJ\fW.\u001a;fe&TX\rZ\u0004\b\u0003k\u0011\u0001\u0012AA\u001c\u0003E9%o\\;q/&tGm\\<J)\u000e\u000b7/\u001a\t\u0004\u0001\u0006ebAB\u0001\u0003\u0011\u0003\tYd\u0005\u0003\u0002:\u0005u\u0002cA\u001c\u0002@%\u0019\u0011\u0011\t\u001d\u0003\r\u0005s\u0017PU3g\u0011\u001di\u0014\u0011\bC\u0001\u0003\u000b\"\"!a\u000e\t\u0011\u0005%\u0013\u0011\bC\u0001\u0003\u0017\n!\u0002]1sC6,G/\u001a:t)\t\ti\u0005\u0005\u0004\u0002P\u0005U\u0013\u0011L\u0007\u0003\u0003#R1!a\u0015K\u0003\u0011)H/\u001b7\n\t\u0005]\u0013\u0011\u000b\u0002\u000b\u0007>dG.Z2uS>t\u0007#B\u001c\u0002\\\u0005}\u0013bAA/q\t)\u0011I\u001d:bsB!\u0011\u0011MA4\u001b\t\t\u0019GC\u0002\u0002f)\u000bA\u0001\\1oO&!\u0011\u0011NA2\u0005\u0019y%M[3di\"B\u0011qIA7\u0003w\ni\b\u0005\u0003\u0002p\u0005Ud\u0002BA\u0015\u0003cJA!a\u001d\u0002,\u0005i\u0001+\u0019:b[\u0016$XM]5{K\u0012LA!a\u001e\u0002z\tQ\u0001+\u0019:b[\u0016$XM]:\u000b\t\u0005M\u00141F\u0001\u0005]\u0006lW-\t\u0002\u0002\u0000\u000593\u000b^1uK\n\u000b7m[3oIvZ\b' \u0017!+N,G+[7fgR\fW\u000e\u001d'uu\u0002j\u0004e_\u0019~\u0001")
public class GroupWindowITCase
extends StreamingWithStateTestBase {
    private final boolean useTimestampLtz;
    private final ZoneId SHANGHAI_ZONE;
    private static Symbol symbol$1 = Symbol$.MODULE$.apply("rowtime");
    private static Symbol symbol$2 = Symbol$.MODULE$.apply("int");
    private static Symbol symbol$3 = Symbol$.MODULE$.apply("string");
    private static Symbol symbol$4 = Symbol$.MODULE$.apply("name");
    private static Symbol symbol$5 = Symbol$.MODULE$.apply("long");
    private static Symbol symbol$6 = Symbol$.MODULE$.apply("a");
    private static Symbol symbol$7 = Symbol$.MODULE$.apply("b");
    private static Symbol symbol$8 = Symbol$.MODULE$.apply("c");

    @Parameterized.Parameters(name="StateBackend={0}, UseTimestampLtz = {1}")
    public static Collection<Object[]> parameters() {
        return GroupWindowITCase$.MODULE$.parameters();
    }

    public ZoneId SHANGHAI_ZONE() {
        return this.SHANGHAI_ZONE;
    }

    @Override
    public void before() {
        super.before();
        String timestampDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.timestampData());
        String timestampLtzDataId = TestValuesTableFactory.registerData(TestData$.MODULE$.timestampLtzData());
        this.tEnv().getConfig().setLocalTimeZone(this.SHANGHAI_ZONE());
        this.tEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |CREATE TABLE testTable (\n         | `ts` ", ",\n         | `int` INT,\n         | `double` DOUBLE,\n         | `float` FLOAT,\n         | `bigdec` DECIMAL(10, 2),\n         | `string` STRING,\n         | `name` STRING,\n         | `rowtime` AS\n         | ", ",\n         | proctime as PROCTIME(),\n         | WATERMARK for `rowtime` AS `rowtime` - INTERVAL '0.01' SECOND\n         |) WITH (\n         | 'connector' = 'values',\n         | 'data-id' = '", "',\n         | 'failing-source' = 'true'\n         |)\n         |"})).s((Seq)Predef$.MODULE$.genericWrapArray((Object)new Object[]{this.useTimestampLtz ? "BIGINT" : "STRING", this.useTimestampLtz ? "TO_TIMESTAMP_LTZ(`ts`, 3)" : "TO_TIMESTAMP(`ts`)", this.useTimestampLtz ? timestampLtzDataId : timestampDataId})))).stripMargin());
    }

    @Test
    public void testEventTimeSlidingWindow() {
        this.tEnv().registerFunction("concat_distinct_agg", (AggregateFunction)new JavaUserDefinedAggFunctions.ConcatDistinctAggFunction(), (TypeInformation)BasicTypeInfo.getInfoFor(String.class), TypeExtractor.createTypeInfo(JavaUserDefinedAggFunctions.ConcatAcc.class));
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  HOP_START(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  HOP_ROWTIME(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  COUNT(DISTINCT `float`),\n        |  concat_distinct_agg(name)\n        |FROM testTable\n        |GROUP BY `string`, HOP(rowtime, INTERVAL '0.004' SECOND, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hallo,1970-01-01T00:00,1969-12-31T16:00:00.004Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1969-12-31T16:00:00.008Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1969-12-31T16:00:00.012Z,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1969-12-31T16:00:00.016Z,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1969-12-31T16:00:00.020Z,1,1,1,1,b", "Hello,1970-01-01T00:00,1969-12-31T16:00:00.004Z,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1969-12-31T16:00:00.008Z,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1969-12-31T16:00:00.004Z,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1969-12-31T16:00:00.032Z,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1969-12-31T16:00:00.036Z,1,1,1,1,null"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hallo,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.008,1970-01-01T00:00:00.012,1,1,1,1,a", "Hello world,1970-01-01T00:00:00.012,1970-01-01T00:00:00.016,1,1,1,1,b", "Hello world,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,1,b", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.004,2,2,2,2,a", "Hello,1970-01-01T00:00:00.004,1970-01-01T00:00:00.008,3,3,3,2,a|b", "Hi,1970-01-01T00:00,1970-01-01T00:00:00.004,1,1,1,1,a", "null,1970-01-01T00:00:00.028,1970-01-01T00:00:00.032,1,1,1,1,null", "null,1970-01-01T00:00:00.032,1970-01-01T00:00:00.036,1,1,1,1,null"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testCascadingTumbleWindow() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT SUM(cnt)\n        |FROM (\n        |  SELECT COUNT(1) AS cnt, TUMBLE_ROWTIME(rowtime, INTERVAL '10' SECOND) AS ts\n        |  FROM testTable\n        |  GROUP BY `int`, `string`, TUMBLE(rowtime, INTERVAL '10' SECOND)\n        |)\n        |GROUP BY TUMBLE(ts, INTERVAL '10' SECOND)\n        |")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"9"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testMinMaxWithTumblingWindow() {
        this.tEnv().getConfig().getConfiguration().setBoolean("table.exec.emit.early-fire.enabled", true);
        this.tEnv().getConfig().getConfiguration().setString("table.exec.emit.early-fire.delay", "1000 ms");
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        | MAX(max_ts),\n        | MIN(min_ts),\n        | `string`\n        |FROM(\n        | SELECT\n        | `string`,\n        | `int`,\n        | MAX(rowtime) as max_ts,\n        | MIN(rowtime) as min_ts\n        | FROM testTable\n        | GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))\n        |GROUP BY `string`\n      ")).stripMargin();
        TestingRetractSink sink = new TestingRetractSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toRetractStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = this.useTimestampLtz ? (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1969-12-31T16:00:00.001Z,1969-12-31T16:00:00.001Z,Hi", "1969-12-31T16:00:00.002Z,1969-12-31T16:00:00.002Z,Hallo", "1969-12-31T16:00:00.007Z,1969-12-31T16:00:00.003Z,Hello", "1969-12-31T16:00:00.016Z,1969-12-31T16:00:00.008Z,Hello world", "1969-12-31T16:00:00.032Z,1969-12-31T16:00:00.032Z,null"})) : (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01T00:00:00.001,1970-01-01T00:00:00.001,Hi", "1970-01-01T00:00:00.002,1970-01-01T00:00:00.002,Hallo", "1970-01-01T00:00:00.007,1970-01-01T00:00:00.003,Hello", "1970-01-01T00:00:00.016,1970-01-01T00:00:00.008,Hello world", "1970-01-01T00:00:00.032,1970-01-01T00:00:00.032,null"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getRetractResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testWindowAggregateOnConstantValue() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT TUMBLE_END(rowtime, INTERVAL '0.003' SECOND), COUNT(name)\n        |FROM testTable\n        | GROUP BY 'a', TUMBLE(rowtime, INTERVAL '0.003' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"1970-01-01T00:00:00.003,2", "1970-01-01T00:00:00.006,2", "1970-01-01T00:00:00.009,3", "1970-01-01T00:00:00.018,1", "1970-01-01T00:00:00.033,0"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testProctimeCascadeWindowAgg() {
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        | SELECT\n        |  cnt AS key,\n        |  TUMBLE_START(pt1, INTERVAL '0.01' SECOND) AS window_start,\n        |  TUMBLE_END(pt1, INTERVAL '0.01' SECOND) AS window_start,\n        |  TUMBLE_PROCTIME(pt1, INTERVAL '0.01' SECOND) as window_proctime,\n        |  MAX(s1) AS v1,\n        |  MAX(e1) AS v2\n        | FROM\n        | (SELECT\n        |   TUMBLE_START(proctime, INTERVAL '0.005' SECOND) as s1,\n        |   TUMBLE_END(proctime, INTERVAL '0.005' SECOND) e1,\n        |   TUMBLE_PROCTIME(proctime, INTERVAL '0.005' SECOND) as pt1,\n        |   COUNT(name) as cnt\n        |  FROM testTable\n        |  GROUP BY 'a', TUMBLE(proctime, INTERVAL '0.005' SECOND)\n        |  ) as T\n        | GROUP BY cnt, TUMBLE(pt1, INTERVAL '0.01' SECOND)\n      ")).stripMargin();
        ResolvedSchema resolvedSchema = this.tEnv().sqlQuery(sql).getResolvedSchema();
        Assert.assertEquals((Object)new StringOps(Predef$.MODULE$.augmentString(new StringContext((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"\n         |(\n         |  `key` BIGINT NOT NULL,\n         |  `window_start` TIMESTAMP(3) NOT NULL,\n         |  `window_start0` TIMESTAMP(3) NOT NULL,\n         |  `window_proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME*,\n         |  `v1` TIMESTAMP(3) NOT NULL,\n         |  `v2` TIMESTAMP(3) NOT NULL\n         |)\n         "})).s((Seq)Nil$.MODULE$))).stripMargin().trim(), (Object)resolvedSchema.toString());
    }

    @Test
    public void testEventTimeSessionWindow() {
        if (this.useTimestampLtz) {
            return;
        }
        List sessionData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple4[]{new Tuple4((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hello", (Object)"a"), new Tuple4((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello", (Object)"b"), new Tuple4((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)8), (Object)"Hello", (Object)"a"), new Tuple4((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)9), (Object)"Hello World", (Object)"b"), new Tuple4((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)4), (Object)"Hello", (Object)"c"), new Tuple4((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)16), (Object)"Hello", (Object)"d")}));
        DataStream stream = this.failingDataSource(sessionData, new CaseClassTypeInfo<Tuple4<Object, Object, String, String>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$4 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple4<Object, Object, String, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$4 $outer;
                    private final ExecutionConfig executionConfig$1;
                    private final TypeSerializer[] fieldSerializers$1;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$1[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$1);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$1 = executionConfig$1;
                        this.fieldSerializers$1 = fieldSerializers$1;
                    }
                });
                ScalaCaseClassSerializer<Tuple4<Object, Object, String, String>> unused = new ScalaCaseClassSerializer<Tuple4<Object, Object, String, String>>(this, fieldSerializers){

                    public Tuple4<Object, Object, String, String> createInstance(Object[] fields) {
                        return new Tuple4((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]), (Object)((String)fields[3]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions(stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{(Expression)package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime(), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), package$.MODULE$.symbol2FieldExpression(symbol$4)}));
        this.tEnv().registerTable("T1", table);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  SESSION_START(rowtime, INTERVAL '0.005' SECOND),\n        |  SESSION_ROWTIME(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(1),\n        |  SUM(1),\n        |  COUNT(`int`),\n        |  SUM(`int`),\n        |  COUNT(DISTINCT name)\n        |FROM T1\n        |GROUP BY `string`, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sql)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hello World,1970-01-01T00:00:00.009,1970-01-01T00:00:00.013,1,1,1,9,1", "Hello,1970-01-01T00:00:00.016,1970-01-01T00:00:00.020,1,1,1,16,1", "Hello,1970-01-01T00:00:00.001,1970-01-01T00:00:00.012,4,4,4,15,3"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    @Test
    public void testEventTimeTumblingWindowWithAllowLateness() {
        if (this.useTimestampLtz) {
            return;
        }
        this.tEnv().getConfig().setIdleStateRetentionTime(Time.milliseconds((long)10L), Time.minutes((long)6L));
        this.withLateFireDelay(this.tEnv().getConfig(), Time.of((long)0L, (TimeUnit)TimeUnit.NANOSECONDS));
        List data = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple3[]{new Tuple3((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hi"), new Tuple3((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello world"), new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello world"), new Tuple3((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)4), (Object)"Hello world"), new Tuple3((Object)BoxesRunTime.boxToLong((long)3L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hi")}));
        DataStream stream = this.failingDataSource(data, new CaseClassTypeInfo<Tuple3<Object, Object, String>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$5 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$5 $outer;
                    private final ExecutionConfig executionConfig$2;
                    private final TypeSerializer[] fieldSerializers$2;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$2[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$2);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$2 = executionConfig$2;
                        this.fieldSerializers$2 = fieldSerializers$2;
                    }
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(0L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions(stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$5), package$.MODULE$.symbol2FieldExpression(symbol$2), package$.MODULE$.symbol2FieldExpression(symbol$3), (Expression)package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime()}));
        this.tEnv().registerTable("T1", table);
        this.tEnv().createTemporarySystemFunction("weightAvgFun", JavaUserDefinedAggFunctions.WeightedAvg.class);
        String sql = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT\n        |  `string`,\n        |  TUMBLE_START(rowtime, INTERVAL '0.005' SECOND) as w_start,\n        |  TUMBLE_END(rowtime, INTERVAL '0.005' SECOND),\n        |  COUNT(DISTINCT `long`),\n        |  COUNT(`int`),\n        |  CAST(AVG(`int`) AS INT),\n        |  weightAvgFun(`long`, `int`),\n        |  MIN(`int`),\n        |  MAX(`int`),\n        |  SUM(`int`)\n        |FROM T1\n        |GROUP BY `string`, TUMBLE(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        Table result = this.tEnv().sqlQuery(sql);
        TypeInformation[] fieldTypes = (TypeInformation[])((Object[])new TypeInformation[]{Types.STRING(), Types.LOCAL_DATE_TIME(), Types.LOCAL_DATE_TIME(), Types.LONG(), Types.LONG(), Types.INT(), Types.LONG(), Types.INT(), Types.INT(), Types.INT()});
        String[] fieldNames = (String[])((TraversableOnce)Predef$.MODULE$.refArrayOps((Object[])fieldTypes).indices().map((Function1)new Serializable(this){
            public static final long serialVersionUID = 0L;

            public final String apply(int x$1) {
                return new StringBuilder().append((Object)"f").append((Object)BoxesRunTime.boxToInteger((int)x$1)).toString();
            }
        }, IndexedSeq$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.apply(String.class));
        TestingUpsertTableSink sink = new TestingUpsertTableSink(new int[]{0, 1}).configure(fieldNames, (TypeInformation<?>[])fieldTypes);
        ((TableEnvironmentInternal)this.tEnv()).registerTableSinkInternal("MySink", (TableSink)sink);
        result.executeInsert("MySink").await();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hi,1970-01-01T00:00,1970-01-01T00:00:00.005,1,1,1,1,1,1,1", "Hello,1970-01-01T00:00,1970-01-01T00:00:00.005,2,3,2,3,2,3,7", "Hello world,1970-01-01T00:00:00.015,1970-01-01T00:00:00.020,1,1,3,16,3,3,3", "Hello world,1970-01-01T00:00:00.005,1970-01-01T00:00:00.010,2,2,3,8,3,4,7"}));
        Assert.assertEquals((Object)((TraversableOnce)expected.sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"), (Object)((TraversableOnce)sink.getUpsertResults().sorted((Ordering)Ordering.String$.MODULE$)).mkString("\n"));
    }

    @Test
    public void testDistinctAggWithMergeOnEventTimeSessionGroupWindow() {
        if (this.useTimestampLtz) {
            return;
        }
        List sessionWindowTestData = List$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new Tuple3[]{new Tuple3((Object)BoxesRunTime.boxToLong((long)1L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)2L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)8L), (Object)BoxesRunTime.boxToInteger((int)2), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)10L), (Object)BoxesRunTime.boxToInteger((int)3), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)9L), (Object)BoxesRunTime.boxToInteger((int)9), (Object)"Hello World"), new Tuple3((Object)BoxesRunTime.boxToLong((long)4L), (Object)BoxesRunTime.boxToInteger((int)1), (Object)"Hello"), new Tuple3((Object)BoxesRunTime.boxToLong((long)16L), (Object)BoxesRunTime.boxToInteger((int)16), (Object)"Hello")}));
        DataStream stream = this.failingDataSource(sessionWindowTestData, new CaseClassTypeInfo<Tuple3<Object, Object, String>>(this){

            public /* synthetic */ TypeInformation[] protected$types($anon$6 x$1) {
                return x$1.types;
            }

            public TypeSerializer<Tuple3<Object, Object, String>> createSerializer(ExecutionConfig executionConfig) {
                TypeSerializer[] fieldSerializers = new TypeSerializer[this.getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), this.getArity()).foreach$mVc$sp((Function1)new Serializable(this, executionConfig, fieldSerializers){
                    public static final long serialVersionUID = 0L;
                    private final /* synthetic */ $anon$6 $outer;
                    private final ExecutionConfig executionConfig$3;
                    private final TypeSerializer[] fieldSerializers$3;

                    public final void apply(int i) {
                        this.apply$mcVI$sp(i);
                    }

                    public void apply$mcVI$sp(int i) {
                        this.fieldSerializers$3[i] = this.$outer.protected$types(this.$outer)[i].createSerializer(this.executionConfig$3);
                    }
                    {
                        if ($outer == null) {
                            throw null;
                        }
                        this.$outer = $outer;
                        this.executionConfig$3 = executionConfig$3;
                        this.fieldSerializers$3 = fieldSerializers$3;
                    }
                });
                ScalaCaseClassSerializer<Tuple3<Object, Object, String>> unused = new ScalaCaseClassSerializer<Tuple3<Object, Object, String>>(this, fieldSerializers){

                    public Tuple3<Object, Object, String> createInstance(Object[] fields) {
                        return new Tuple3((Object)BoxesRunTime.boxToLong((long)BoxesRunTime.unboxToLong((Object)fields[0])), (Object)BoxesRunTime.boxToInteger((int)BoxesRunTime.unboxToInt((Object)fields[1])), (Object)((String)fields[2]));
                    }
                };
                return new ScalaCaseClassSerializer(this.getTypeClass(), fieldSerializers);
            }
        }).assignTimestampsAndWatermarks(new TimeTestUtil.TimestampAndWatermarkWithOffset(10L));
        Table table = org.apache.flink.table.api.bridge.scala.package$.MODULE$.dataStreamConversions(stream).toTable(this.tEnv(), (Seq)Predef$.MODULE$.wrapRefArray((Object[])new Expression[]{package$.MODULE$.symbol2FieldExpression(symbol$6), package$.MODULE$.symbol2FieldExpression(symbol$7), package$.MODULE$.symbol2FieldExpression(symbol$8), (Expression)package$.MODULE$.UnresolvedFieldExpression(symbol$1).rowtime()}));
        this.tEnv().registerTable("MyTable", table);
        String sqlQuery = new StringOps(Predef$.MODULE$.augmentString("\n        |SELECT c,\n        |   COUNT(DISTINCT b),\n        |   SESSION_END(rowtime, INTERVAL '0.005' SECOND)\n        |FROM MyTable\n        |GROUP BY c, SESSION(rowtime, INTERVAL '0.005' SECOND)\n      ")).stripMargin();
        TestingAppendSink sink = new TestingAppendSink();
        org.apache.flink.table.api.bridge.scala.package$.MODULE$.tableConversions(this.tEnv().sqlQuery(sqlQuery)).toAppendStream(TypeExtractor.createTypeInfo(Row.class)).addSink((SinkFunction)sink);
        this.env().execute();
        Seq expected = (Seq)Seq$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{"Hello World,1,1970-01-01T00:00:00.014", "Hello,1,1970-01-01T00:00:00.021", "Hello,3,1970-01-01T00:00:00.015"}));
        Assert.assertEquals((Object)expected.sorted((Ordering)Ordering.String$.MODULE$), (Object)sink.getAppendResults().sorted((Ordering)Ordering.String$.MODULE$));
    }

    private void withLateFireDelay(TableConfig tableConfig, Time interval) {
        long intervalInMillis = interval.toMilliseconds();
        Duration lateFireDelay = tableConfig.getConfiguration().getOptional(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY()).orElse(null);
        if (lateFireDelay != null && lateFireDelay.toMillis() != intervalInMillis) {
            throw new RuntimeException("Currently not support different lateFireInterval configs in one job");
        }
        tableConfig.getConfiguration().setBoolean(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_ENABLED(), true);
        tableConfig.getConfiguration().set(WindowEmitStrategy$.MODULE$.TABLE_EXEC_EMIT_LATE_FIRE_DELAY(), (Object)Duration.ofMillis(intervalInMillis));
    }

    public GroupWindowITCase(StreamingWithStateTestBase.StateBackendMode mode, boolean useTimestampLtz) {
        this.useTimestampLtz = useTimestampLtz;
        super(mode);
        this.SHANGHAI_ZONE = ZoneId.of("Asia/Shanghai");
    }
}

