package org.apache.flink.table.planner.plan.stream.sql;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Collection;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.api.scala.typeutils.ScalaCaseClassSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.package$;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdaf;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdf;
import org.apache.flink.table.planner.expressions.utils.TestNonDeterministicUdtf;
import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedTableFunctions;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sinks.UpsertStreamTableSink;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Array$;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Symbol;
import scala.Tuple3;
import scala.Tuple4;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.RichInt$;
import scala.runtime.SymbolLiteral;

/* compiled from: NonDeterministicDagTest.scala */
@RunWith(Parameterized.class)
@ScalaSignature(bytes = "\u0006\u0001\u0011=b\u0001B\u0001\u0003\u0001M\u0011qCT8o\t\u0016$XM]7j]&\u001cH/[2EC\u001e$Vm\u001d;\u000b\u0005\r!\u0011aA:rY*\u0011QAB\u0001\u0007gR\u0014X-Y7\u000b\u0005\u001dA\u0011\u0001\u00029mC:T!!\u0003\u0006\u0002\u000fAd\u0017M\u001c8fe*\u00111\u0002D\u0001\u0006i\u0006\u0014G.\u001a\u0006\u0003\u001b9\tQA\u001a7j].T!a\u0004\t\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\t\u0012aA8sO\u000e\u00011C\u0001\u0001\u0015!\t)\u0002$D\u0001\u0017\u0015\t9\u0002\"A\u0003vi&d7/\u0003\u0002\u001a-\tiA+\u00192mKR+7\u000f\u001e\"bg\u0016D\u0001b\u0007\u0001\u0003\u0002\u0003\u0006I\u0001H\u0001\u001f]>tG)\u001a;fe6Lg.[:uS\u000e,\u0006\u000fZ1uKN#(/\u0019;fOf\u0004\"!H\u001a\u000f\u0005y\u0001dBA\u0010.\u001d\t\u00013F\u0004\u0002\"U9\u0011!%\u000b\b\u0003G!r!\u0001J\u0014\u000e\u0003\u0015R!A\n\n\u0002\rq\u0012xn\u001c;?\u0013\u0005\t\u0012BA\b\u0011\u0013\tia\"\u0003\u0002\f\u0019%\u0011AFC\u0001\u0004CBL\u0017B\u0001\u00180\u0003\u0019\u0019wN\u001c4jO*\u0011AFC\u0005\u0003cI\nac\u00149uS6L'0\u001a:D_:4\u0017nZ(qi&|gn\u001d\u0006\u0003]=J!\u0001N\u001b\u0003=9{g\u000eR3uKJl\u0017N\\5ti&\u001cW\u000b\u001d3bi\u0016\u001cFO]1uK\u001eL(BA\u00193\u0011\u00159\u0004\u0001\"\u00019\u0003\u0019a\u0014N\\5u}Q\u0011\u0011h\u000f\t\u0003u\u0001i\u0011A\u0001\u0005\u00067Y\u0002\r\u0001\b\u0005\b{\u0001\u0011\r\u0011\"\u0003?\u0003\u0011)H/\u001b7\u0016\u0003}\u0002\"!\u0006!\n\u0005\u00053\"aE*ue\u0016\fW\u000eV1cY\u0016$Vm\u001d;Vi&d\u0007BB\"\u0001A\u0003%q(A\u0003vi&d\u0007\u0005C\u0004F\u0001\t\u0007I\u0011\u0002$\u0002\u0015Q\u0014\u0018PU3t_24X-F\u0001H!\tA5*D\u0001J\u0015\u0005Q\u0015!B:dC2\f\u0017B\u0001'J\u0005\u001d\u0011un\u001c7fC:DaA\u0014\u0001!\u0002\u00139\u0015a\u0003;ssJ+7o\u001c7wK\u0002BQ\u0001\u0015\u0001\u0005\u0002E\u000baAY3g_J,G#\u0001*\u0011\u0005!\u001b\u0016B\u0001+J\u0005\u0011)f.\u001b;)\u0005=3\u0006CA,[\u001b\u0005A&BA-\u0011\u0003\u0015QWO\\5u\u0013\tY\u0006L\u0001\u0004CK\u001a|'/\u001a\u0005\u0006;\u0002!\t!U\u0001\u001ai\u0016\u001cHo\u00113d/&$\b.T3uCNKgn[,ji\"\u00046\u000e\u000b\u0002]?B\u0011q\u000bY\u0005\u0003Cb\u0013A\u0001V3ti\")1\r\u0001C\u0001#\u0006yC/Z:u\u001d>tG)\u001a;fe6Lg.[:uS\u000e\u0004&o\u001c6fGRLwN\\,ji\"\u001c\u0016N\\6XSRDw.\u001e;QW\"\u0012!m\u0018\u0005\u0006M\u0002!\t!U\u0001\u001di\u0016\u001cHo\u00113d/&$\b.T3uCNKgn[,ji\"|W\u000f\u001e)lQ\t)w\fC\u0003j\u0001\u0011\u0005\u0011+A\u0010uKN$8\tZ2XSRDW*\u001a;b\u0019\u0016<\u0017mY=TS:\\w+\u001b;i!.D#\u0001[0\t\u000b1\u0004A\u0011A)\u0002EQ,7\u000f^\"eG^KG\u000f['fi\u0006dUmZ1dsNKgn[,ji\"|W\u000f\u001e)lQ\tYw\fC\u0003p\u0001\u0011\u0005\u0011+\u0001\u0012uKN$8\tZ2XSRDW*\u001a;b'&t7nV5uQ\u000e{W\u000e]8tSR,\u0007k\u001b\u0015\u0003]~CQA\u001d\u0001\u0005\u0002E\u000b\u0001\u0006^3ti\u000e#7mV5uQ6+G/\u0019*f]\u0006lWmU5oW^KG\u000f[\"p[B|7/\u001b;f!.D#!]0\t\u000bU\u0004A\u0011A)\u0002MQ,7\u000f^*pkJ\u001cWmV5uQ\u000e{W\u000e];uK\u0012\u001cu\u000e\\;n]NKgn[,ji\"\u00046\u000e\u000b\u0002u?\")\u0001\u0010\u0001C\u0001#\u0006)C/Z:u'>,(oY3XSRD7i\\7qkR,GmQ8mk6tW*\u001e7uSNKgn\u001b\u0015\u0003o~CQa\u001f\u0001\u0005\u0002E\u000ba\u0006^3ti\u000e#7mQ8se\u0016d\u0017\r^3O_:$U\r^3s[&t\u0017n\u001d;jG\u001a+hnY*j].<\u0016\u000e\u001e5Q\u0017\"\u0012!p\u0018\u0005\u0006}\u0002!\t!U\u00011i\u0016\u001cHo\u00113d\u0007>\u0014(/\u001a7bi\u0016tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7MR;oG:{G*\u001a4u\u001fV$\b/\u001e;)\u0005u|\u0006BBA\u0002\u0001\u0011\u0005\u0011+A\u0019uKN$8\tZ2D_J\u0014X\r\\1uK:{g\u000eR3uKJl\u0017N\\5ti&\u001cg)\u001e8d\u001d>\u0014\u0016n\u001a5u\u001fV$\b/\u001e;)\u0007\u0005\u0005q\f\u0003\u0004\u0002\n\u0001!\t!U\u0001,i\u0016\u001cHo\u00113d\u0007>\u0014(/\u001a7bi\u0016|eNT8o\t\u0016$XM]7j]&\u001cH/[2D_:$\u0017\u000e^5p]\"\u001a\u0011qA0\t\r\u0005=\u0001\u0001\"\u0001R\u0003\t\"Xm\u001d;DI\u000e<\u0016\u000e\u001e5NKR\f7i\u001c:sK2\fG/Z*j].<\u0016\u000e\u001e5QW\"\u001a\u0011QB0\t\r\u0005U\u0001\u0001\"\u0001R\u0003%\"Xm\u001d;DI\u000e<\u0016\u000e\u001e5O_:$U\r^3s[&t\u0017n\u001d;jG\u001a+hnY*j].<\u0016\u000e\u001e5QW\"\u001a\u00111C0\t\r\u0005m\u0001\u0001\"\u0001R\u00031\"Xm\u001d;DI\u000e<\u0016\u000e\u001e5O_:$U\r^3s[&t\u0017n\u001d;jG\u001a+hnY*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002\u001a}Ca!!\t\u0001\t\u0003\t\u0016!\t;fgR\u001cEmY,ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7MR5mi\u0016\u0014\bfAA\u0010?\"1\u0011q\u0005\u0001\u0005\u0002E\u000ba\u0004^3ti\u000e#7MS8j]\u0012KWnV5uQB[7+\u001b8l/&$\b\u000eU6)\u0007\u0005\u0015r\f\u0003\u0004\u0002.\u0001!\t!U\u0001\"i\u0016\u001cHo\u00113d\u0015>Lg\u000eR5n/&$\bn\\;u!.\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0003Wy\u0006BBA\u001a\u0001\u0011\u0005\u0011+\u0001\u0012uKN$8\tZ2MK\u001a$(j\\5o\t&lw+\u001b;i!.\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0003cy\u0006BBA\u001d\u0001\u0011\u0005\u0011+A\u0011uKN$8\tZ2K_&tG)[7XSRD\u0007k[*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u00028}Ca!a\u0010\u0001\t\u0003\t\u0016\u0001\n;fgR\u001cEm\u0019&pS:$\u0015.\\,ji\"|W\u000f\u001e)l'&t7nV5uQ>,H\u000fU6)\u0007\u0005ur\f\u0003\u0004\u0002F\u0001!\t!U\u0001&i\u0016\u001cHo\u00113d\u0015>Lg\u000eR5n/&$\b\u000eU6P]2L8+\u001b8l/&$\bn\\;u!.D3!a\u0011`\u0011\u0019\tY\u0005\u0001C\u0001#\u0006AC/Z:u\u0007\u0012\u001cG*\u001a4u\u0015>Lg\u000eR5n/&$\bn\\;u!.\u001c\u0016N\\6XSRDw.\u001e;QW\"\u001a\u0011\u0011J0\t\r\u0005E\u0003\u0001\"\u0001R\u0003-\"Xm\u001d;DI\u000eTu.\u001b8ES6<\u0016\u000e\u001e5QW>+H\u000f];u\u001d>\u00046nU5oW^KG\u000f[8viB[\u0007fAA(?\"1\u0011q\u000b\u0001\u0005\u0002E\u000bQ\u0007^3ti\u000e#7MS8j]\u0012KWnV5uQB[gj\u001c8EKR,'/\\5oSN$\u0018n\u0019$v]\u000e\u001c\u0016N\\6XSRDw.\u001e;QW\"\u001a\u0011QK0\t\r\u0005u\u0003\u0001\"\u0001R\u0003I\"Xm\u001d;DI\u000eTu.\u001b8ES6<\u0016\u000e\u001e5QW:{g\u000eR3uKJl\u0017N\\5ti&\u001cGj\\2bY\u000e{g\u000eZ5uS>t\u0007fAA.?\"1\u00111\r\u0001\u0005\u0002E\u000b1\u0007^3ti\u000e#7MS8j]\u0012KWnV5uQB[gj\u001c8EKR,'/\\5oSN$\u0018n\u0019'pG\u0006d7i\u001c8eSRLwN\u001c\u001a)\u0007\u0005\u0005t\f\u0003\u0004\u0002j\u0001!\t!U\u00011i\u0016\u001cHo\u00113d\u0015>Lg\u000eR5n\u001d>tG)\u001a;fe6Lg.[:uS\u000e\u0014V-\\1j]&twmQ8oI&$\u0018n\u001c8)\u0007\u0005\u001dt\f\u0003\u0004\u0002p\u0001!\t!U\u0001-i\u0016\u001cHo\u0012:pkB\u0014\u0015PT8o\t\u0016$XM]7j]&\u001cH/[2Gk:\u001cw+\u001b;i\u0007\u0012\u001c7k\\;sG\u0016D3!!\u001c`\u0011\u0019\t)\b\u0001C\u0001#\u0006YC/Z:u\u000fJ|W\u000f\u001d\"z\u001d>tG)\u001a;fe6Lg.[:uS\u000e,FMZ,ji\"\u001cEmY*pkJ\u001cW\rK\u0002\u0002t}Ca!a\u001f\u0001\t\u0003\t\u0016!\f;fgRtUm\u001d;fI\u0006;wmV5uQ:{g\u000eR3uKJl\u0017N\\5ti&\u001cwI]8va&twmS3zg\"\u001a\u0011\u0011P0\t\r\u0005\u0005\u0005\u0001\"\u0001R\u0003)\"Xm\u001d;He>,\b/Q4h\u001d>tG)\u001a;fe6Lg.[:uS\u000e4UO\\2P]N{WO]2f!.D3!a `\u0011\u0019\t9\t\u0001C\u0001#\u0006)C/Z:u\u0003\u001e<w+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e4\u0015\u000e\u001c;fe\u0006\u0013xm\u001d\u0015\u0004\u0003\u000b{\u0006BBAG\u0001\u0011\u0005\u0011+\u0001\u0019uKN$\u0018iZ4XSRDgj\u001c8EKR,'/\\5oSN$\u0018n\u0019$jYR,'/\u0011:hg>s7\tZ2T_V\u00148-\u001a\u0015\u0004\u0003\u0017{\u0006BBAJ\u0001\u0011\u0005\u0011+A\u001fuKN$\u0018iZ4XSRDgj\u001c8EKR,'/\\5oSN$\u0018n\u0019$jYR,'/\u0011:hg>s7\tZ2T_V\u00148-Z*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002\u0012~Ca!!'\u0001\t\u0003\t\u0016a\f;fgRtuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-Q4h\u001f:\f\u0005\u000f]3oIN{WO]2f'&t7nV5uQB[\u0007fAAL?\"1\u0011q\u0014\u0001\u0005\u0002E\u000b!\u0007^3ti:{g\u000eR3uKJl\u0017N\\5ti&\u001c\u0017iZ4P]\u0006\u0003\b/\u001a8e'>,(oY3TS:\\w+\u001b;i_V$\bk\u001b\u0015\u0004\u0003;{\u0006BBAS\u0001\u0011\u0005\u0011+A\u001buKN$x\t\\8cC2tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-Q4h\u001f:\f\u0005\u000f]3oIN{WO]2f'&t7nV5uQB[\u0007fAAR?\"1\u00111\u0016\u0001\u0005\u0002E\u000b\u0001\b^3ti\u001ecwNY1m\u001d>tG)\u001a;fe6Lg.[:uS\u000e\fumZ(o\u0003B\u0004XM\u001c3T_V\u00148-Z*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002*~Ca!!-\u0001\t\u0003\t\u0016A\u0007;fgR,\u0006o]3siN{WO]2f'&t7nV5uQB[\u0007fAAX?\"1\u0011q\u0017\u0001\u0005\u0002E\u000bQ\u0004^3tiV\u00038/\u001a:u'>,(oY3TS:\\w+\u001b;i_V$\bk\u001b\u0015\u0004\u0003k{\u0006BBA_\u0001\u0011\u0005\u0011+A\u0018uKN$X*\u001e7uS>3XM],ji\"tuN\u001c#fi\u0016\u0014X.\u001b8jgRL7-\u00163bMNKgn[,ji\"\u00046\u000eK\u0002\u0002<~Ca!a1\u0001\t\u0003\t\u0016!\f;fgR|e/\u001a:XSRDgj\u001c8EKR,'/\\5oSN$\u0018nY+eC\u001a\u001c\u0016N\\6XSRDw.\u001e;QW\"\u001a\u0011\u0011Y0\t\r\u0005%\u0007\u0001\"\u0001R\u0003Q\"Xm\u001d;Nk2$\u0018n\u0014<fe^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d\u0003\u001e<g)\u001b7uKJ\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0003\u000f|\u0006BBAh\u0001\u0011\u0005\u0011+A\u001euKN$\u0018\t\u001d9f]\u0012\u0014\u0016M\\6P]6+H\u000e^5Pm\u0016\u0014x+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e,F-\u00194TS:\\w+\u001b;i!.D3!!4`\u0011\u0019\t)\u000e\u0001C\u0001#\u0006qD/Z:u\u0003B\u0004XM\u001c3SC:\\wJ\\'vYRLwJ^3s/&$\bNT8o\t\u0016$XM]7j]&\u001cH/[2VI\u000647+\u001b8l/&$\bn\\;u!.D3!a5`\u0011\u0019\tY\u000e\u0001C\u0001#\u00069C/Z:u+B$\u0017\r^3SC:\\w*\u001e;qkR\u0014vn\u001e(v[\n,'oU5oW^KG\u000f\u001b)lQ\r\tIn\u0018\u0005\u0007\u0003C\u0004A\u0011A)\u0002QQ,7\u000f\u001e*fiJ\f7\r\u001e*b].|U\u000f\u001e9viJ{wOT;nE\u0016\u00148+\u001b8l/&$\b\u000eU6)\u0007\u0005}w\f\u0003\u0004\u0002h\u0002!\t!U\u0001\u001di\u0016\u001cH/\u00168j_:\u001c\u0016N\\6XSRD7i\\7q_NLG/\u001a)lQ\r\t)o\u0018\u0005\u0007\u0003[\u0004A\u0011A)\u0002?Q,7\u000f^+oS>t\u0017\t\u001c7TS:\\w+\u001b;i\u0007>l\u0007o\\:ji\u0016\u00046\u000eK\u0002\u0002l~Ca!a=\u0001\t\u0003\t\u0016!\u0007;fgR,f.[8o\u00032d7+\u001b8l/&$\bn\\;u!.D3!!=`\u0011\u0019\tI\u0010\u0001C\u0001#\u0006AC/Z:u\u0007\u0012\u001c'j\\5o/&$\bNT8o\t\u0016$XM]7j]&\u001cH/[2D_:$\u0017\u000e^5p]\"\u001a\u0011q_0\t\r\u0005}\b\u0001\"\u0001R\u0003\u0015\"Xm\u001d;Qe>\u001cG/[7f\u0013:$XM\u001d<bY*{\u0017N\\*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0002~~CaA!\u0002\u0001\t\u0003\t\u0016\u0001\f;fgR\u001cEm\u0019)s_\u000e$\u0018.\\3J]R,'O^1m\u0015>Lgn\u00148QWNKgn[,ji\"|W\u000f\u001e)lQ\r\u0011\u0019a\u0018\u0005\u0007\u0005\u0017\u0001A\u0011A)\u0002_Q,7\u000f^\"eGB\u0013xn\u0019;j[\u0016Le\u000e^3sm\u0006d'j\\5o\u001f:tuN\u001c)l'&t7nV5uQ>,H\u000fU6)\u0007\t%q\f\u0003\u0004\u0003\u0012\u0001!\t!U\u0001(i\u0016\u001cHo\u00113d%><H/[7f\u0013:$XM\u001d<bY*{\u0017N\\*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0003\u0010}CaAa\u0006\u0001\t\u0003\t\u0016\u0001\n;fgR\u001cEm\u0019*poRLW.Z%oi\u0016\u0014h/\u00197K_&t7+\u001b8l/&$\b\u000eU6)\u0007\tUq\f\u0003\u0004\u0003\u001e\u0001!\t!U\u0001\u0016i\u0016\u001cHOS8j].+\u0017pQ8oi\u0006Lgn]+lQ\r\u0011Yb\u0018\u0005\u0007\u0005G\u0001A\u0011A)\u0002-Q,7\u000f\u001e&pS:D\u0015m\u001d\"pi\"\u001c\u0016\u000eZ3t+.D3A!\t`\u0011\u0019\u0011I\u0003\u0001C\u0001#\u0006\u0019C/Z:u\u0015>Lg\u000eS1t\u0005>$\bnU5eKN,6nU5oW^KG\u000f[8viB[\u0007f\u0001B\u0014?\"1!q\u0006\u0001\u0005\u0002E\u000bq\u0003^3ti*{\u0017N\u001c%bgNKgn\u001a7f'&$W-V6)\u0007\t5r\f\u0003\u0004\u00036\u0001!\t!U\u0001\u001ai\u0016\u001cHoU3nS*{\u0017N\\&fs\u000e{g\u000e^1j]N,6\u000eK\u0002\u00034}CaAa\u000f\u0001\t\u0003\t\u0016!\u0007;fgR\fe\u000e^5K_&t7*Z=D_:$\u0018-\u001b8t+.D3A!\u000f`\u0011\u0019\u0011\t\u0005\u0001C\u0001#\u0006AD/Z:u'\u0016l\u0017NS8j]^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d\u0007>tG-\u001b;j_:\u001c\u0016N\\4mKNKG-\u001a%bgV[\u0007f\u0001B ?\"1!q\t\u0001\u0005\u0002E\u000bq\u0006^3ti\u000e#7MS8j]^KG\u000f\u001b(p]\u0012+G/\u001a:nS:L7\u000f^5d\u001fV$\b/\u001e;TS:\\w+\u001b;i!.D3A!\u0012`\u0011\u0019\u0011i\u0005\u0001C\u0001#\u0006aC/Z:u!J|7\r^5nK\u0012+G-\u001e9P]\u000e#7mV5uQ6+G/\u00193bi\u0006\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0005\u0017z\u0006B\u0002B*\u0001\u0011\u0005\u0011+A\u0018uKN$\bK]8di&lW\rR3ekB|en\u00113d/&$\b.T3uC\u0012\fG/Y*j].<\u0016\u000e\u001e5pkR\u00046\u000eK\u0002\u0003R}CaA!\u0017\u0001\t\u0003\t\u0016a\u000b;fgR\u0014vn\u001e;j[\u0016$U\rZ;q\u001f:\u001cEmY,ji\"lU\r^1eCR\f7+\u001b8l/&$\b\u000eU6)\u0007\t]s\f\u0003\u0004\u0003`\u0001!\t!U\u0001!i\u0016\u001cHoV5oI><H)\u001a3va>s7\tZ2XSRDW*\u001a;bI\u0006$\u0018\rK\u0002\u0003^}CaA!\u001a\u0001\t\u0003\t\u0016!\b;fgRtUm\u001d;fIN{WO]2f/&$\b.T;mi&\u001c\u0016N\\6)\u0007\t\rt\f\u0003\u0004\u0003l\u0001!\t!U\u0001\u001ai\u0016\u001cH/T;mi&\u001c\u0016N\\6P]*{\u0017N\\3e-&,w\u000fK\u0002\u0003j}CaA!\u001d\u0001\t\u0003\t\u0016\u0001\b;fgRl\u0015\r^2i%\u0016\u001cwn\u001a8ju\u0016\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0005_z\u0006B\u0002B<\u0001\u0011\u0005\u0011+\u0001 uKN$X*\u0019;dQJ+7m\\4oSj,w+\u001b;i\u001d>tG)\u001a;fe6Lg.[:uS\u000e\u001cuN\u001c3ji&|gn\u00148DI\u000e\u001c\u0016N\\6XSRD\u0007k\u001b\u0015\u0004\u0005kz\u0006B\u0002B?\u0001\u0011\u0005\u0011+A\u0017uKN$X*\u0019;dQJ+7m\\4oSj,wJ\\\"eG^KG\u000f['fi\u0006$\u0015\r^1TS:\\w+\u001b;i!.D3Aa\u001f`\r\u0019\u0011\u0019\t\u0001\u0002\u0003\u0006\n\tB+Z:uS:<W\u000b]:feR\u001c\u0016N\\6\u0014\r\t\u0005%q\u0011BL!\u0011\u0011IIa%\u000e\u0005\t-%\u0002\u0002BG\u0005\u001f\u000bA\u0001\\1oO*\u0011!\u0011S\u0001\u0005U\u00064\u0018-\u0003\u0003\u0003\u0016\n-%AB(cU\u0016\u001cG\u000f\u0005\u0004\u0003\u001a\n}%1U\u0007\u0003\u00057S1A!(\u000b\u0003\u0015\u0019\u0018N\\6t\u0013\u0011\u0011\tKa'\u0003+U\u00038/\u001a:u'R\u0014X-Y7UC\ndWmU5oWB!!Q\u0015BV\u001b\t\u00119KC\u0002\u0003**\tA\u0001Z1uC&!!Q\u0016BT\u0005\u001d\u0011vn\u001e#bi\u0006D1B!-\u0003\u0002\n\u0015\r\u0011\"\u0001\u00034\u0006!1.Z=t+\t\u0011)\fE\u0003I\u0005o\u0013Y,C\u0002\u0003:&\u0013Q!\u0011:sCf\u0004BA!0\u0003F:!!q\u0018Ba!\t!\u0013*C\u0002\u0003D&\u000ba\u0001\u0015:fI\u00164\u0017\u0002\u0002Bd\u0005\u0013\u0014aa\u0015;sS:<'b\u0001Bb\u0013\"Y!Q\u001aBA\u0005\u0003\u0005\u000b\u0011\u0002B[\u0003\u0015YW-_:!\u0011-\u0011\tN!!\u0003\u0006\u0004%\tAa-\u0002\u0015\u0019LW\r\u001c3OC6,7\u000fC\u0006\u0003V\n\u0005%\u0011!Q\u0001\n\tU\u0016a\u00034jK2$g*Y7fg\u0002B1B!7\u0003\u0002\n\u0015\r\u0011\"\u0001\u0003\\\u0006Qa-[3mIRK\b/Z:\u0016\u0005\tu\u0007#\u0002%\u00038\n}\u0007\u0003\u0002Bq\u0005Ol!Aa9\u000b\u0007\t\u0015(\"A\u0003usB,7/\u0003\u0003\u0003j\n\r(\u0001\u0003#bi\u0006$\u0016\u0010]3\t\u0017\t5(\u0011\u0011B\u0001B\u0003%!Q\\\u0001\fM&,G\u000e\u001a+za\u0016\u001c\b\u0005C\u00048\u0005\u0003#\tA!=\u0015\u0011\tM(q\u001fB}\u0005w\u0004BA!>\u0003\u00026\t\u0001\u0001\u0003\u0005\u00032\n=\b\u0019\u0001B[\u0011!\u0011\tNa<A\u0002\tU\u0006\u0002\u0003Bm\u0005_\u0004\rA!8\t\u0015\t}(\u0011\u0011a\u0001\n\u0003\u0019\t!\u0001\u0007fqB,7\r^3e\u0017\u0016L8/\u0006\u0002\u0004\u0004A)\u0001j!\u0002\u00036&\u00191qA%\u0003\r=\u0003H/[8o\u0011)\u0019YA!!A\u0002\u0013\u00051QB\u0001\u0011Kb\u0004Xm\u0019;fI.+\u0017p]0%KF$2AUB\b\u0011)\u0019\tb!\u0003\u0002\u0002\u0003\u000711A\u0001\u0004q\u0012\n\u0004\"CB\u000b\u0005\u0003\u0003\u000b\u0015BB\u0002\u00035)\u0007\u0010]3di\u0016$7*Z=tA!Q1\u0011\u0004BA\u0001\u0004%\taa\u0007\u0002)\u0015D\b/Z2uK\u0012L5/\u00119qK:$wJ\u001c7z+\t\u0019i\u0002\u0005\u0003I\u0007\u000b9\u0005BCB\u0011\u0005\u0003\u0003\r\u0011\"\u0001\u0004$\u0005AR\r\u001f9fGR,G-S:BaB,g\u000eZ(oYf|F%Z9\u0015\u0007I\u001b)\u0003\u0003\u0006\u0004\u0012\r}\u0011\u0011!a\u0001\u0007;A\u0011b!\u000b\u0003\u0002\u0002\u0006Ka!\b\u0002+\u0015D\b/Z2uK\u0012L5/\u00119qK:$wJ\u001c7zA!A1Q\u0006BA\t\u0003\u001ay#\u0001\bhKR$\u0016M\u00197f'\u000eDW-\\1\u0015\u0005\rE\u0002\u0003BB\u001a\u0007ki\u0011aL\u0005\u0004\u0007oy#a\u0003+bE2,7k\u00195f[\u0006D\u0001ba\u000f\u0003\u0002\u0012\u00053QH\u0001\rg\u0016$8*Z=GS\u0016dGm\u001d\u000b\u0004%\u000e}\u0002\u0002\u0003BY\u0007s\u0001\rA!.\t\u0011\r\r#\u0011\u0011C!\u0007\u000b\nqb]3u\u0013N\f\u0005\u000f]3oI>sG.\u001f\u000b\u0004%\u000e\u001d\u0003\u0002CB%\u0007\u0003\u0002\raa\u0013\u0002\u0019%\u001c\u0018\t\u001d9f]\u0012|e\u000e\\=\u0011\t\r53q\u000b\b\u0005\u0007\u001f\u001a\u0019FD\u0002!\u0007#J!!\u0003\u0006\n\u0007\rU\u0003\"A\u0004qC\u000e\\\u0017mZ3\n\t\re31\f\u0002\t\u0015\n{w\u000e\\3b]*\u00191Q\u000b\u0005\t\u0011\r}#\u0011\u0011C!\u0007C\nQbZ3u%\u0016\u001cwN\u001d3UsB,GCAB2!\u0019\u0019)g!\u001d\u0003$6\u00111q\r\u0006\u0005\u0007S\u001aY'\u0001\u0005usB,\u0017N\u001c4p\u0015\u0011\u0019iga\u001c\u0002\r\r|W.\\8o\u0015\taC\"\u0003\u0003\u0004t\r\u001d$a\u0004+za\u0016LeNZ8s[\u0006$\u0018n\u001c8\t\u0011\r]$\u0011\u0011C!\u0007s\n\u0011cY8ogVlW\rR1uCN#(/Z1n)\u0011\u0019Yh!*1\t\ru41\u0013\t\u0007\u0007\u007f\u001aYia$\u000e\u0005\r\u0005%\u0002BBB\u0007\u000b\u000b!\u0002Z1uCN$(/Z1n\u0015\ra3q\u0011\u0006\u0004\u0007\u0013c\u0011!C:ue\u0016\fW.\u001b8h\u0013\u0011\u0019ii!!\u0003\u001d\u0011\u000bG/Y*ue\u0016\fWnU5oWB!1\u0011SBJ\u0019\u0001!Ab!&\u0004v\u0005\u0005\t\u0011!B\u0001\u0007/\u00131a\u0018\u00132#\u0011\u0019Ija(\u0011\u0007!\u001bY*C\u0002\u0004\u001e&\u0013qAT8uQ&tw\rE\u0002I\u0007CK1aa)J\u0005\r\te.\u001f\u0005\t\u0007O\u001b)\b1\u0001\u0004*\u0006QA-\u0019;b'R\u0014X-Y7\u0011\r\r}41VBX\u0013\u0011\u0019ik!!\u0003\u0015\u0011\u000bG/Y*ue\u0016\fW\u000e\u0005\u0005\u00042\u000ee61\nBR\u001b\t\u0019\u0019L\u0003\u0003\u00046\u000e]\u0016!\u0002;va2,'\u0002\u0002BI\u0007_JAaa/\u00044\n1A+\u001e9mKJB\u0001ba0\u0003\u0002\u0012\u00053\u0011Y\u0001\nG>tg-[4ve\u0016$bAa=\u0004D\u000e\u001d\u0007\u0002CBc\u0007{\u0003\rA!.\u0002\r\u0019t\u0015-\\3t\u0011!\u0019Im!0A\u0002\r-\u0017A\u00024UsB,7\u000fE\u0003I\u0005o\u001bi\r\r\u0003\u0004P\u000eM\u0007CBB3\u0007c\u001a\t\u000e\u0005\u0003\u0004\u0012\u000eMG\u0001DBk\u0007\u000f\f\t\u0011!A\u0003\u0002\r]%aA0%e!:\u0001a!7\u0004f\u000e\u001d\b\u0003BBn\u0007Cl!a!8\u000b\u0007\r}\u0007,\u0001\u0004sk:tWM]\u0005\u0005\u0007G\u001ciNA\u0004Sk:<\u0016\u000e\u001e5\u0002\u000bY\fG.^3$\u0005\r%\b\u0003BBv\u0007cl!a!<\u000b\u0007\r=\b,A\u0004sk:tWM]:\n\t\rM8Q\u001e\u0002\u000e!\u0006\u0014\u0018-\\3uKJL'0\u001a3\b\u000f\r](\u0001#\u0001\u0004z\u00069bj\u001c8EKR,'/\\5oSN$\u0018n\u0019#bOR+7\u000f\u001e\t\u0004u\rmhAB\u0001\u0003\u0011\u0003\u0019ip\u0005\u0003\u0004|\u000e}\bc\u0001%\u0005\u0002%\u0019A1A%\u0003\r\u0005s\u0017PU3g\u0011\u001d941 C\u0001\t\u000f!\"a!?\t\u0011\u0011-11 C\u0001\t\u001b\t!\u0002]1sC6,G/\u001a:t)\t!y\u0001E\u0003\u0005\u0012\u0011UA$\u0004\u0002\u0005\u0014)\u0019QHa$\n\t\u0011]A1\u0003\u0002\u000b\u0007>dG.Z2uS>t\u0007\u0006\u0003C\u0005\t7!I\u0003b\u000b\u0011\t\u0011uA1\u0005\b\u0005\u0007W$y\"\u0003\u0003\u0005\"\r5\u0018!\u0004)be\u0006lW\r^3sSj,G-\u0003\u0003\u0005&\u0011\u001d\"A\u0003)be\u0006lW\r^3sg*!A\u0011EBw\u0003\u0011q\u0017-\\3\"\u0005\u00115\u0012A\t8p]\u0012+G/\u001a:nS:L7\u000f^5d+B$\u0017\r^3TiJ\fG/Z4z{m\u0004T\u0010")
/* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest.class */
public class NonDeterministicDagTest extends TableTestBase {
    private final OptimizerConfigOptions.NonDeterministicUpdateStrategy nonDeterministicUpdateStrategy;
    private final StreamTableTestUtil util = streamTestUtil(streamTestUtil$default$1());
    private final boolean tryResolve;

    /* compiled from: NonDeterministicDagTest.scala */
    /* loaded from: input_file:org/apache/flink/table/planner/plan/stream/sql/NonDeterministicDagTest$TestingUpsertSink.class */
    public final class TestingUpsertSink implements UpsertStreamTableSink<RowData> {
        private final String[] keys;
        private final String[] fieldNames;
        private final DataType[] fieldTypes;
        private Option<String[]> expectedKeys;
        private Option<Object> expectedIsAppendOnly;
        private final /* synthetic */ NonDeterministicDagTest $outer;

        public TypeInformation<Tuple2<Boolean, RowData>> getOutputType() {
            return super.getOutputType();
        }

        public String[] keys() {
            return this.keys;
        }

        public String[] fieldNames() {
            return this.fieldNames;
        }

        public DataType[] fieldTypes() {
            return this.fieldTypes;
        }

        public Option<String[]> expectedKeys() {
            return this.expectedKeys;
        }

        public void expectedKeys_$eq(Option<String[]> option) {
            this.expectedKeys = option;
        }

        public Option<Object> expectedIsAppendOnly() {
            return this.expectedIsAppendOnly;
        }

        public void expectedIsAppendOnly_$eq(Option<Object> option) {
            this.expectedIsAppendOnly = option;
        }

        public TableSchema getTableSchema() {
            TableSchema.Builder builder = TableSchema.builder();
            Predef$.MODULE$.assert(fieldNames().length == fieldTypes().length);
            builder.fields(fieldNames(), fieldTypes());
            if (keys() == null || !new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(keys())).nonEmpty()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                builder.primaryKey(keys());
            }
            return builder.build();
        }

        public void setKeyFields(String[] strArr) {
            if (expectedKeys().isDefined() && strArr == null) {
                throw new AssertionError("Provided key fields should not be null.");
            }
            if (expectedKeys().isEmpty()) {
            }
        }

        public void setIsAppendOnly(Boolean bool) {
            if (expectedIsAppendOnly().isEmpty()) {
            }
        }

        public TypeInformation<RowData> getRecordType() {
            return InternalTypeInfo.ofFields((LogicalType[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(fieldTypes())).map(dataType -> {
                return dataType.getLogicalType();
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(LogicalType.class))), fieldNames());
        }

        public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) {
            throw Predef$.MODULE$.$qmark$qmark$qmark();
        }

        public TestingUpsertSink configure(String[] strArr, TypeInformation<?>[] typeInformationArr) {
            return new TestingUpsertSink(this.$outer, keys(), strArr, (DataType[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(typeInformationArr)).map(typeInformation -> {
                return TypeConversions.fromLegacyInfoToDataType(typeInformation);
            }, Array$.MODULE$.canBuildFrom(ClassTag$.MODULE$.apply(DataType.class))));
        }

        /* renamed from: configure, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ TableSink m1185configure(String[] strArr, TypeInformation[] typeInformationArr) {
            return configure(strArr, (TypeInformation<?>[]) typeInformationArr);
        }

        public TestingUpsertSink(NonDeterministicDagTest nonDeterministicDagTest, String[] strArr, String[] strArr2, DataType[] dataTypeArr) {
            this.keys = strArr;
            this.fieldNames = strArr2;
            this.fieldTypes = dataTypeArr;
            if (nonDeterministicDagTest == null) {
                throw null;
            }
            this.$outer = nonDeterministicDagTest;
            this.expectedKeys = None$.MODULE$;
            this.expectedIsAppendOnly = None$.MODULE$;
        }
    }

    @Parameterized.Parameters(name = "nonDeterministicUpdateStrategy={0}")
    public static Collection<OptimizerConfigOptions.NonDeterministicUpdateStrategy> parameters() {
        return NonDeterministicDagTest$.MODULE$.parameters();
    }

    private StreamTableTestUtil util() {
        return this.util;
    }

    private boolean tryResolve() {
        return this.tryResolve;
    }

    @Before
    public void before() {
        util().tableConfig().getConfiguration().set(OptimizerConfigOptions.TABLE_OPTIMIZER_NONDETERMINISTIC_UPDATE_STRATEGY, this.nonDeterministicUpdateStrategy);
        util().tableEnv().getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, BoxesRunTime.boxToInteger(4));
        final NonDeterministicDagTest nonDeterministicDagTest = null;
        util().addTableSource("T", (Seq<Expression>) Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "a").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "b").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "c").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "d").dynamicInvoker().invoke() /* invoke-custom */)}), (TypeInformation) new CaseClassTypeInfo<Tuple4<Object, Object, String, Object>>(nonDeterministicDagTest) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$3
            public /* synthetic */ TypeInformation[] protected$types(NonDeterministicDagTest$$anon$3 nonDeterministicDagTest$$anon$3) {
                return nonDeterministicDagTest$$anon$3.types;
            }

            public TypeSerializer<Tuple4<Object, Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple4<Object, Object, String, Object>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$3$$anon$1
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple4<Object, Object, String, Object> m1182createInstance(Object[] objArr) {
                        return new Tuple4<>(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[0])), BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[1])), (String) objArr[2], BoxesRunTime.boxToBoolean(BoxesRunTime.unboxToBoolean(objArr[3])));
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple4.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Boolean.TYPE), Nil$.MODULE$)))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Boolean.TYPE), Nil$.MODULE$)))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3", "_4"})));
            }
        });
        final NonDeterministicDagTest nonDeterministicDagTest2 = null;
        util().addDataStream("T1", Predef$.MODULE$.wrapRefArray(new Expression[]{package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "a").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "b").dynamicInvoker().invoke() /* invoke-custom */), package$.MODULE$.symbol2FieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "c").dynamicInvoker().invoke() /* invoke-custom */), (Expression) package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "proctime").dynamicInvoker().invoke() /* invoke-custom */).proctime(), (Expression) package$.MODULE$.UnresolvedFieldExpression((Symbol) SymbolLiteral.bootstrap(MethodHandles.lookup(), "apply", MethodType.methodType(Symbol.class), "rowtime").dynamicInvoker().invoke() /* invoke-custom */).rowtime()}), new CaseClassTypeInfo<Tuple3<Object, String, Object>>(nonDeterministicDagTest2) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$4
            public /* synthetic */ TypeInformation[] protected$types(NonDeterministicDagTest$$anon$4 nonDeterministicDagTest$$anon$4) {
                return nonDeterministicDagTest$$anon$4.types;
            }

            public TypeSerializer<Tuple3<Object, String, Object>> createSerializer(ExecutionConfig executionConfig) {
                final TypeSerializer[] typeSerializerArr = new TypeSerializer[getArity()];
                RichInt$.MODULE$.until$extension0(Predef$.MODULE$.intWrapper(0), getArity()).foreach$mVc$sp(i -> {
                    typeSerializerArr[i] = this.protected$types(this)[i].createSerializer(executionConfig);
                });
                new ScalaCaseClassSerializer<Tuple3<Object, String, Object>>(this, typeSerializerArr) { // from class: org.apache.flink.table.planner.plan.stream.sql.NonDeterministicDagTest$$anon$4$$anon$2
                    /* renamed from: createInstance, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
                    public Tuple3<Object, String, Object> m1184createInstance(Object[] objArr) {
                        return new Tuple3<>(BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(objArr[0])), (String) objArr[1], BoxesRunTime.boxToLong(BoxesRunTime.unboxToLong(objArr[2])));
                    }

                    {
                        Class typeClass = this.getTypeClass();
                    }
                };
                return new ScalaCaseClassSerializer(getTypeClass(), typeSerializerArr);
            }

            {
                super(Tuple3.class, (TypeInformation[]) new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), Nil$.MODULE$))).toArray((ClassTag) Predef$.MODULE$.implicitly(ClassTag$.MODULE$.apply(TypeInformation.class))), new $colon.colon(BasicTypeInfo.getInfoFor(Integer.TYPE), new $colon.colon(BasicTypeInfo.getInfoFor(String.class), new $colon.colon(BasicTypeInfo.getInfoFor(Long.TYPE), Nil$.MODULE$))), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"_1", "_2", "_3"})));
            }
        });
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table src (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d bigint,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table upsert_src (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_computed_col (\n                               |  a int,\n                               |  b bigint,\n                               |  c string,\n                               |  d int,\n                               |  `day` as DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd'),\n                               |  primary key(a, c) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |create temporary table cdc_with_meta (\n        | a int,\n        | b bigint,\n        | c string,\n        | d boolean,\n        | metadata_1 int metadata,\n        | metadata_2 string metadata,\n        | metadata_3 bigint metadata,\n        | primary key (a) not enforced\n        |) with (\n        | 'connector' = 'values',\n        | 'changelog-mode' = 'I,UA,UB,D',\n        | 'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n        |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_watermark (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | op_ts timestamp_ltz(3),\n                               | primary key (a) not enforced,\n                               | watermark for op_ts as op_ts - interval '5' second\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D',\n                               | 'readable-metadata' = 'op_ts:timestamp_ltz(3)'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_meta_and_wm (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | op_ts timestamp_ltz(3) metadata,\n                               | primary key (a) not enforced,\n                               | watermark for op_ts as op_ts - interval '5' second\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D',\n                               | 'readable-metadata' = 'op_ts:timestamp_ltz(3)'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink_with_composite_pk (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d bigint,\n                               | primary key (a,d) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink_with_pk (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink_without_pk (\n                               | a int,\n                               | b bigint,\n                               | c string\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table dim_with_pk (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table dim_without_pk (\n                               | a int,\n                               | b bigint,\n                               | c string\n                               |) with (\n                               | 'connector' = 'values'\n                               |)")).stripMargin());
        util().tableEnv().createTemporaryFunction("ndFunc", new TestNonDeterministicUdf());
        util().tableEnv().createTemporaryFunction("ndTableFunc", new TestNonDeterministicUdtf());
        util().tableEnv().createTemporaryFunction("ndAggFunc", new TestNonDeterministicUdaf());
        util().tableEnv().createTemporaryFunction("str_split", new JavaUserDefinedTableFunctions.StringSplit());
    }

    @Test
    public void testCdcWithMetaSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testNonDeterministicProjectionWithSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("The column(s): d(generated by non-deterministic function: ndFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                |insert into sink_without_pk\n                                |select\n                                |  a,\n                                |  if(a > 100, b+d, b) as b,\n                                |  case when d > 100 then json_value(c, '$.count')\n                                |  else cast(b as string) || '#' end as c\n                                |from (\n                                |select a, b, c, d from (\n                                |  select *, row_number() over(partition by a order by d desc) as rn\n                                |  from (\n                                |    select a, d as b, c, ndFunc(b) as d from cdc\n                                |  ) tmp\n                                |) tmp where rn = 1) tmp\n                                |")).stripMargin());
    }

    @Test
    public void testCdcWithMetaSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithMetaLegacySinkWithPk() {
        util().tableEnv().registerTableSinkInternal("legacy_upsert_sink", new TestingUpsertSink(this, new String[]{"a"}, new String[]{"a", "b", "c"}, new DataType[]{(DataType) DataTypes.INT().notNull(), DataTypes.BIGINT(), DataTypes.VARCHAR(100)}));
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into legacy_upsert_sink\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithMetaLegacySinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().tableEnv().registerTableSinkInternal("legacy_retract_sink", util().createRetractTableSink(new String[]{"a", "b", "c"}, new LogicalType[]{new IntType(), new BigIntType(), VarCharType.STRING_TYPE}));
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into legacy_retract_sink\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithMetaSinkWithCompositePk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_composite_pk\n                                 |select a, b, c, metadata_3\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithMetaRenameSinkWithCompositePk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table cdc_with_meta_rename (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | d boolean,\n                               | metadata_3 bigint metadata,\n                               | e as metadata_3,\n                               | primary key (a) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D',\n                               | 'readable-metadata' = 'metadata_3:BIGINT'\n                               |)")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_composite_pk\n                                 |select a, b, c, e from cdc_with_meta_rename\n                                 |")).stripMargin());
    }

    @Test
    public void testSourceWithComputedColumnSinkWithPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, b, `day`\n                                 |from cdc_with_computed_col\n                                 |where b > 100\n                                 |")).stripMargin());
    }

    @Test
    public void testSourceWithComputedColumnMultiSink() {
        StatementSet createStatementSet = util().tableEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink_without_pk\n                            |select a, sum(b), `day`\n                            |from cdc_with_computed_col\n                            |group by a, `day`\n                            |")).stripMargin());
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink_with_pk\n                            |select a, b, `day`\n                            |from cdc_with_computed_col\n                            |where b > 100\n                            |")).stripMargin());
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlan(createStatementSet);
    }

    @Test
    public void testCdcCorrelateNonDeterministicFuncSinkWithPK() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): EXPR$0(generated by non-deterministic function: ndTableFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  t1.a, t1.b, a1\n                                 |from cdc t1, lateral table(ndTableFunc(a)) as T(a1)\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcCorrelateNonDeterministicFuncNoLeftOutput() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): EXPR$0(generated by non-deterministic function: ndTableFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk(a)\n                                 |select\n                                 |  cast(a1 as integer) a\n                                 |from cdc t1, lateral table(ndTableFunc(a)) as T(a1)\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcCorrelateNonDeterministicFuncNoRightOutput() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, b, c\n                                 |from cdc t1 join lateral table(ndTableFunc(a)) as T(a1) on true\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcCorrelateOnNonDeterministicCondition() {
        thrown().expectMessage("unexpected correlate variable $cor0 in the plan");
        thrown().expect(TableException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, b, c\n                                 |from cdc t1 join lateral table(str_split(c)) as T(c1)\n                                 | -- the join predicate can only be empty or literal true for now\n                                 |  on ndFunc(b) > 100\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithMetaCorrelateSinkWithPk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_1' in cdc source may cause wrong result or error on downstream operators");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.metadata_1, T.c1\n                                 |from cdc_with_meta t1, lateral table(str_split(c)) as T(c1)\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithNonDeterministicFuncSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, ndFunc(b), c\n                                 |from cdc\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithNonDeterministicFuncSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): EXPR$1(generated by non-deterministic function: ndFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select a, ndFunc(b), c\n                                 |from cdc\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcWithNonDeterministicFilter() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t1.c\n                                 |from cdc t1\n                                 |where t1.b > UNIX_TIMESTAMP() - 300\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithoutPkSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_without_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcLeftJoinDimWithPkSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 left join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithoutPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_without_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkOnlySinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t1.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcLeftJoinDimWithoutPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_without_pk\n         |select t1.a, t1.b, t2.c\n         |from (\n         |  select *, proctime() proctime from cdc\n         |) t1 left join dim_without_pk for system_time as of t1.proctime as t2\n         |on t1.a = t2.a\n         |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkOutputNoPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t2.b, t1.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkNonDeterministicFuncSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): a(generated by non-deterministic function: ndFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select ndFunc(t2.a) a, t1.b, t1.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkNonDeterministicLocalCondition() {
        if (tryResolve()) {
            thrown().expectMessage("exists non deterministic function: 'ndFunc' in condition: '>(ndFunc($1), 100)' which may cause wrong result");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select t1.a, t1.b, t1.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a and ndFunc(t2.b) > 100\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimWithPkNonDeterministicLocalCondition2() {
        if (tryResolve()) {
            thrown().expectMessage("exists non deterministic function: 'UNIX_TIMESTAMP' in condition: '>($1, -(UNIX_TIMESTAMP(), 300))' which may cause wrong result");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t2.b as version, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |  -- check dim table data's freshness\n                                 |  and t2.b > UNIX_TIMESTAMP() - 300\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinDimNonDeterministicRemainingCondition() {
        if (tryResolve()) {
            thrown().expectMessage("exists non deterministic function: 'ndFunc' in condition: '>($1, ndFunc($3))' which may cause wrong result");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select t1.a, t2.b, t2.c\n                                 |from (\n                                 |  select *, proctime() proctime from cdc\n                                 |) t1 join dim_with_pk for system_time as of t1.proctime as t2\n                                 |on t1.a = t2.a\n                                 |  -- non deterministic function in remaining condition\n                                 |  and t1.b > ndFunc(t2.b)\n                                 |")).stripMargin());
    }

    @Test
    public void testGroupByNonDeterministicFuncWithCdcSource() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  a, count(*) cnt, `day`\n                                 |from (\n                                 |  select *, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') `day` from cdc\n                                 |) t\n                                 |group by `day`, a\n                                 |")).stripMargin());
    }

    @Test
    public void testGroupByNonDeterministicUdfWithCdcSource() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): EXPR$0(generated by non-deterministic function: ndFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  ndFunc(a), count(*) cnt, c\n                                 |from cdc\n                                 |group by ndFunc(a), c\n                                 |")).stripMargin());
    }

    @Test
    public void testNestedAggWithNonDeterministicGroupingKeys() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_pk\n         |select\n         |  a, sum(b) qmt, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') `day`\n         |from (\n         |  select *, row_number() over (partition by a order by PROCTIME() desc) rn from src\n         |) t\n         |where rn = 1\n         |group by a, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')\n         |")).stripMargin());
    }

    @Test
    public void testGroupAggNonDeterministicFuncOnSourcePk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select\n         |  `day`, count(*) cnt, sum(b) qmt\n         |from (\n         |  select *, concat(cast(a as varchar), DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) `day` from cdc\n         |) t\n         |group by `day`\n         |")).stripMargin());
    }

    @Test
    public void testAggWithNonDeterministicFilterArgs() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_pk\n         |select\n         |  a\n         |  ,count(*) cnt\n         |  ,cast(count(distinct c) filter (where b > UNIX_TIMESTAMP() - 180) as varchar) valid_uv\n         |from T\n         |group by a\n         |")).stripMargin());
    }

    @Test
    public void testAggWithNonDeterministicFilterArgsOnCdcSource() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): $f2(generated by non-deterministic function: UNIX_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_pk\n         |select\n         |  a\n         |  ,count(*) cnt\n         |  ,cast(count(distinct c) filter (where b > UNIX_TIMESTAMP() - 180) as varchar) valid_uv\n         |from cdc\n         |group by a\n         |")).stripMargin());
    }

    @Test
    public void testAggWithNonDeterministicFilterArgsOnCdcSourceSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): $f2(generated by non-deterministic function: UNIX_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_without_pk\n         |select\n         |  a\n         |  ,count(*) cnt\n         |  ,cast(count(distinct c) filter (where b > UNIX_TIMESTAMP() - 180) as varchar) valid_uv\n         |from cdc\n         |group by a\n         |")).stripMargin());
    }

    @Test
    public void testNonDeterministicAggOnAppendSourceSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  a\n                                 |  ,ndAggFunc(b) ndCnt\n                                 |  ,max(c) mc\n                                 |from T\n                                 |group by a\n                                 |")).stripMargin());
    }

    @Test
    public void testNonDeterministicAggOnAppendSourceSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): ndCnt(generated by non-deterministic function: ndAggFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select\n                                 |  a\n                                 |  ,ndAggFunc(b) ndCnt\n                                 |  ,max(c) mc\n                                 |from T\n                                 |group by a\n                                 |")).stripMargin());
    }

    @Test
    public void testGlobalNonDeterministicAggOnAppendSourceSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select\n                                 |  max(a)\n                                 |  ,ndAggFunc(b) ndCnt\n                                 |  ,max(c) mc\n                                 |from T\n                                 |")).stripMargin());
    }

    @Test
    public void testGlobalNonDeterministicAggOnAppendSourceSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): ndCnt(generated by non-deterministic function: ndAggFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select\n                                 |  max(a)\n                                 |  ,ndAggFunc(b) ndCnt\n                                 |  ,max(c) mc\n                                 |from T\n                                 |")).stripMargin());
    }

    @Test
    public void testUpsertSourceSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_pk\n                                 |select a, b, c\n                                 |from upsert_src\n                                 |")).stripMargin());
    }

    @Test
    public void testUpsertSourceSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select a, b, c\n                                 |from upsert_src\n                                 |")).stripMargin());
    }

    @Test
    public void testMultiOverWithNonDeterministicUdafSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_composite_pk\n        |SELECT\n        |  a\n        |  ,COUNT(distinct b)  OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |  ,b\n        |  ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |FROM T1\n      ")).stripMargin());
    }

    @Test
    public void testOverWithNonDeterministicUdafSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |SELECT\n        |  a\n        |  ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)\n        |  ,b\n        |FROM T1\n      ")).stripMargin());
    }

    @Test
    public void testMultiOverWithNonDeterministicAggFilterSinkWithPk() {
        thrown().expectMessage("OVER must be applied to aggregate function");
        thrown().expect(ValidationException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_composite_pk\n        |SELECT\n        |  a\n        |  ,COUNT(distinct b) OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |  ,b\n        |  ,SUM(a) filter (where b > UNIX_TIMESTAMP() - 180) OVER (PARTITION BY a ORDER BY proctime\n        |    ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |FROM T1\n      ")).stripMargin());
    }

    @Test
    public void testAppendRankOnMultiOverWithNonDeterministicUdafSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_composite_pk\n        |select a, uv, b, nd from (\n        | select\n        |  a, uv, b, nd,\n        |  row_number() over (partition by a order by uv desc) rn\n        | from (\n        |  SELECT\n        |    a\n        |    ,COUNT(distinct b)  OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |    ,b\n        |    ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |  FROM T1\n        |  )\n        |) where rn = 1\n      ")).stripMargin());
    }

    @Test
    public void testAppendRankOnMultiOverWithNonDeterministicUdafSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |select a, nd, b from (\n        | select\n        |  a, uv, b, nd,\n        |  row_number() over (partition by a order by uv desc) rn\n        | from (\n        |  SELECT\n        |    a\n        |    ,COUNT(distinct b)  OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) uv\n        |    ,b\n        |    ,ndAggFunc(a) OVER (PARTITION BY a ORDER BY proctime\n        |      ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) nd\n        |  FROM T1\n        |  )\n        |) where rn = 1\n      ")).stripMargin());
    }

    @Test
    public void testUpdateRankOutputRowNumberSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                | create temporary view v1 as\n                                |  select a, max(c) c, sum(b) filter (where b > 0) cnt\n                                |  from src\n                                |  group by a\n                                | ")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_composite_pk\n         |select a, cnt, c, rn from (\n         | select\n         |  a, cnt, c, row_number() over (partition by a order by cnt desc) rn\n         | from v1\n         | ) t where t.rn <= 100\n         |")).stripMargin());
    }

    @Test
    public void testRetractRankOutputRowNumberSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                | create temporary view v1 as\n                                |  select a, max(c) c, sum(b) cnt\n                                |  from src\n                                |  group by a\n                                | ")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_composite_pk\n         |select a, cnt, c, rn from (\n         | select\n         |  a, cnt, c, row_number() over (partition by a order by cnt desc) rn\n         | from v1\n         | ) t where t.rn <= 100\n         |")).stripMargin());
    }

    @Test
    public void testUnionSinkWithCompositePk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_composite_pk\n                                 |select a, b, c, d\n                                 |from src\n                                 |union\n                                 |select a, b, c, metadata_3\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testUnionAllSinkWithCompositePk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_with_composite_pk\n                                 |select a, b, c, d\n                                 |from src\n                                 |union all\n                                 |select a, b, c, metadata_3\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testUnionAllSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("metadata column(s): 'metadata_3' in cdc source may cause wrong result or error");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select a, b, c\n                                 |from src\n                                 |union all\n                                 |select a, metadata_3, c\n                                 |from cdc_with_meta\n                                 |")).stripMargin());
    }

    @Test
    public void testCdcJoinWithNonDeterministicCondition() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): $f4(generated by non-deterministic function: ndFunc ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                 |insert into sink_without_pk\n                                 |select\n                                 |  t1.a\n                                 |  ,t2.b\n                                 |  ,t1.c\n                                 |from cdc t1 join cdc t2\n                                 |  on ndFunc(t1.b) = ndFunc(t2.b)\n                                 |")).stripMargin());
    }

    @Test
    public void testProctimeIntervalJoinSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                |insert into sink_without_pk\n                                |SELECT t2.a, t2.c, t1.b FROM T1 t1 JOIN T1 t2 ON\n                                |  t1.a = t2.a AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @Test
    public void testCdcProctimeIntervalJoinOnPkSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                |insert into sink_without_pk\n                                |SELECT t2.a, t2.b, t1.c FROM (\n                                | select *, proctime() proctime from cdc) t1 JOIN\n                                | (select *, proctime() proctime from cdc) t2 ON\n                                |  t1.a = t2.a AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @Test
    public void testCdcProctimeIntervalJoinOnNonPkSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("can not satisfy the determinism requirement");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n                                |insert into sink_without_pk\n                                |SELECT t2.a, t2.b, t1.c FROM (\n                                | select *, proctime() proctime from cdc) t1 JOIN\n                                | (select *, proctime() proctime from cdc) t2 ON\n                                |  t1.b = t2.b AND t1.proctime > t2.proctime - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @Test
    public void testCdcRowtimeIntervalJoinSinkWithoutPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |SELECT t2.a, t1.b, t2.c FROM cdc_with_watermark t1 JOIN cdc_with_watermark t2 ON\n        |  t1.a = t2.a AND t1.op_ts > t2.op_ts - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @Test
    public void testCdcRowtimeIntervalJoinSinkWithPk() {
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT t2.a, t1.b, t2.c FROM cdc_with_watermark t1 JOIN cdc_with_watermark t2 ON\n        |  t1.a = t2.a AND t1.op_ts > t2.op_ts - INTERVAL '5' SECOND\n      ")).stripMargin());
    }

    @Test
    public void testJoinKeyContainsUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t2.`c-day`, t2.b, t2.d\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |join (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         |) t2\n         |  on t1.a = t2.a\n         |")).stripMargin());
    }

    @Test
    public void testJoinHasBothSidesUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t2.a, t2.`c-day`, t2.b, t2.d\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |join (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         |) t2\n         |  on t1.b = t2.b\n         |")).stripMargin());
    }

    @Test
    public void testJoinHasBothSidesUkSinkWithoutPk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): c-day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink_with_pk\n         |select t1.a, t2.a, t2.`c-day`\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |join (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         |) t2\n         |  on t1.b = t2.b\n         |")).stripMargin());
    }

    @Test
    public void testJoinHasSingleSideUk() {
        if (tryResolve()) {
            thrown().expectMessage("can not satisfy the determinism requirement");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t2.`c-day`, t2.b, t2.d\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |join (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         |) t2\n         |  on t1.b = t2.b\n         |")).stripMargin());
    }

    @Test
    public void testSemiJoinKeyContainsUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t1.`c-day`, t1.b, t1.d\n         |from (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         | ) t1\n         |where t1.a in (\n         |  select a from cdc where b > 100\n         |)\n         |")).stripMargin());
    }

    @Test
    public void testAntiJoinKeyContainsUk() {
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t1.`c-day`, t1.b, t1.d\n         |from (\n         |  select a, b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `c-day`, d\n         |  from cdc\n         | ) t1\n         |where t1.a not in (\n         |  select a from cdc where b > 100\n         |)\n         |")).stripMargin());
    }

    @Test
    public void testSemiJoinWithNonDeterministicConditionSingleSideHasUk() {
        if (tryResolve()) {
            thrown().expectMessage("column(s): c(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlan(new StringOps(Predef$.MODULE$.augmentString("\n         |select t1.a, t1.b, t1.c, t1.d\n         |from (\n         |  select a, b, c, d\n         |  from cdc\n         | ) t1\n         |where t1.c in (\n         |  select CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) c from cdc where b > 100\n         |)\n         |")).stripMargin());
    }

    @Test
    public void testCdcJoinWithNonDeterministicOutputSinkWithPk() {
        if (tryResolve()) {
            thrown().expectMessage("The column(s): logistics_time(generated by non-deterministic function: NOW ) can not satisfy the determinism requirement");
            thrown().expect(TableException.class);
        }
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY TABLE t_order (\n                                | order_id INT,\n                                | order_name STRING,\n                                | product_id INT,\n                                | user_id INT,\n                                | PRIMARY KEY(order_id) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values',\n                                | 'changelog-mode' = 'I,UA,UB,D'\n                                |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY TABLE t_logistics (\n                                | logistics_id INT,\n                                | logistics_target STRING,\n                                | logistics_source STRING,\n                                | logistics_time TIMESTAMP(0),\n                                | order_id INT,\n                                | PRIMARY KEY(logistics_id) NOT ENFORCED\n                                |) WITH (\n                                |  'connector' = 'values',\n                                | 'changelog-mode' = 'I,UA,UB,D'\n                                |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |CREATE TEMPORARY TABLE t_join_sink (\n                                | order_id INT,\n                                | order_name STRING,\n                                | logistics_id INT,\n                                | logistics_target STRING,\n                                | logistics_source STRING,\n                                | logistics_time timestamp,\n                                | PRIMARY KEY(order_id) NOT ENFORCED\n                                |) WITH (\n                                | 'connector' = 'values',\n                                | 'sink-insert-only' = 'false'\n                                |)")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n         |INSERT INTO t_join_sink\n         |SELECT ord.order_id,\n         |ord.order_name,\n         |logistics.logistics_id,\n         |logistics.logistics_target,\n         |logistics.logistics_source,\n         |now()\n         |FROM t_order AS ord\n         |LEFT JOIN t_logistics AS logistics ON ord.order_id=logistics.order_id\n         |")).stripMargin());
    }

    @Test
    public void testProctimeDedupOnCdcWithMetadataSinkWithPk() {
        thrown().expectMessage("StreamPhysicalDeduplicate doesn't support consuming update and delete changes");
        thrown().expect(TableException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT a, metadata_3, c\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum\n        |  FROM cdc_with_meta\n        |)\n        |WHERE rowNum = 1\n      ")).stripMargin());
    }

    @Test
    public void testProctimeDedupOnCdcWithMetadataSinkWithoutPk() {
        thrown().expectMessage("StreamPhysicalDeduplicate doesn't support consuming update and delete changes");
        thrown().expect(TableException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_without_pk\n        |SELECT a, metadata_3, c\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY PROCTIME() ASC) as rowNum\n        |  FROM cdc_with_meta\n        |)\n        |WHERE rowNum = 1\n      ")).stripMargin());
    }

    @Test
    public void testRowtimeDedupOnCdcWithMetadataSinkWithPk() {
        thrown().expectMessage("StreamPhysicalDeduplicate doesn't support consuming update and delete changes");
        thrown().expect(TableException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT a, b, c\n        |FROM (\n        |  SELECT *,\n        |    ROW_NUMBER() OVER (PARTITION BY a ORDER BY op_ts ASC) as rowNum\n        |  FROM cdc_with_meta_and_wm\n        |)\n        |WHERE rowNum = 1\n      ")).stripMargin());
    }

    @Test
    public void testWindowDedupOnCdcWithMetadata() {
        thrown().expectMessage("StreamPhysicalWindowDeduplicate doesn't support consuming update and delete changes");
        thrown().expect(TableException.class);
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink1 (\n                               | a int,\n                               | b bigint,\n                               | c string,\n                               | ts timestamp(3),\n                               | primary key (a,ts) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |INSERT INTO sink1\n        |SELECT a, b, c, window_start\n        |FROM (\n        |SELECT *,\n        |   ROW_NUMBER() OVER(PARTITION BY a, window_start, window_end\n        |   ORDER BY op_ts DESC) as rownum\n        |FROM TABLE(TUMBLE(TABLE cdc_with_meta_and_wm, DESCRIPTOR(op_ts), INTERVAL '1' MINUTE))\n        |)\n        |WHERE rownum <= 1")).stripMargin());
    }

    @Test
    public void testNestedSourceWithMultiSink() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |CREATE TABLE nested_src (\n         |  id int,\n         |  deepNested row<nested1 row<name string, `value` int>,\n         |    nested2 row<num int, flag boolean>>,\n         |  name string,\n         |  metadata_1 int metadata,\n         |  metadata_2 string metadata,\n         |  primary key(id, name) not enforced\n         |) WITH (\n         |  'connector' = 'values',\n         |  'nested-projection-supported' = 'true',\n         |  'changelog-mode' = 'I,UA,UB,D',\n         |  'readable-metadata' = 'metadata_1:INT, metadata_2:STRING, metadata_3:BIGINT'\n         |)\n         |")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n        |create view v1 as\n        |SELECT id,\n        |       deepNested.nested2.num AS a,\n        |       deepNested.nested1.name AS name,\n        |       deepNested.nested1.`value` + deepNested.nested2.num + metadata_1 as b\n        |FROM nested_src\n        |")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink1 (\n                               |  a int,\n                               |  b string,\n                               |  d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink2 (\n                               |  a int,\n                               |  b string,\n                               |  d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        StatementSet createStatementSet = util().tableEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n         |insert into sink1\n         |select a, `day`, sum(b)\n         |from (select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day` from v1) t\n         |group by a, `day`\n         |")).stripMargin());
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink2\n                            |select a, name, b\n                            |from v1\n                            |where b > 100\n                            |")).stripMargin());
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlan(createStatementSet);
    }

    @Test
    public void testMultiSinkOnJoinedView() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table src1 (\n                               |  a int,\n                               |  b bigint,\n                               |  c string,\n                               |  d int,\n                               |  primary key(a, c) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table src2 (\n                               |  a int,\n                               |  b bigint,\n                               |  c string,\n                               |  d int,\n                               |  primary key(a, c) not enforced\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'changelog-mode' = 'I,UA,UB,D'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink1 (\n                               |  a int,\n                               |  b string,\n                               |  c bigint,\n                               |  d bigint\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                               |create temporary table sink2 (\n                               |  a int,\n                               |  b string,\n                               |  c bigint,\n                               |  d string\n                               |) with (\n                               | 'connector' = 'values',\n                               | 'sink-insert-only' = 'false'\n                               |)")).stripMargin());
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n         |create temporary view v1 as\n         |select\n         |  t1.a as a, t1.`day` as `day`, t2.b as b, t2.c as c\n         |from (\n         |  select a, b, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd') as `day`\n         |  from src1\n         | ) t1\n         |join (\n         |  select b, CONCAT(c, DATE_FORMAT(CURRENT_TIMESTAMP, 'yyMMdd')) as `day`, c, d\n         |  from src2\n         |) t2\n         | on t1.a = t2.d\n         |")).stripMargin());
        StatementSet createStatementSet = util().tableEnv().createStatementSet();
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink1\n                            |select a, `day`, sum(b), count(distinct c)\n                            |from v1\n                            |group by a, `day`\n                            |")).stripMargin());
        createStatementSet.addInsertSql(new StringOps(Predef$.MODULE$.augmentString("\n                            |insert into sink2\n                            |select a, `day`, b, c\n                            |from v1\n                            |where b > 100\n                            |")).stripMargin());
        if (tryResolve()) {
            thrown().expectMessage("column(s): day(generated by non-deterministic function: CURRENT_TIMESTAMP ) can not satisfy the determinism");
            thrown().expect(TableException.class);
        }
        util().verifyExecPlan(createStatementSet);
    }

    @Test
    public void testMatchRecognizeSinkWithPk() {
        util().tableEnv().executeSql(new StringOps(Predef$.MODULE$.augmentString("\n                                |create temporary view v1 as\n                                |select *, PROCTIME() as proctime from src\n                                |")).stripMargin());
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT T1.a, T1.b, cast(T1.matchProctime as varchar)\n        |FROM v1\n        |MATCH_RECOGNIZE (\n        |PARTITION BY c\n        |ORDER BY proctime\n        |MEASURES\n        |  A.a as a,\n        |  A.b as b,\n        |  MATCH_PROCTIME() as matchProctime\n        |ONE ROW PER MATCH\n        |PATTERN (A)\n        |DEFINE\n        |  A AS A.a > 1\n        |) AS T1\n        |")).stripMargin());
    }

    @Test
    public void testMatchRecognizeWithNonDeterministicConditionOnCdcSinkWithPk() {
        thrown().expectMessage("Match Recognize doesn't support consuming update and delete changes");
        thrown().expect(TableException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT T.a, T.b, cast(T.matchRowtime as varchar)\n        |FROM cdc_with_meta_and_wm\n        |MATCH_RECOGNIZE (\n        |PARTITION BY c\n        |ORDER BY op_ts\n        |MEASURES\n        |  A.a as a,\n        |  A.b as b,\n        |  MATCH_ROWTIME(op_ts) as matchRowtime\n        |ONE ROW PER MATCH\n        |PATTERN (A)\n        |DEFINE\n        |  A AS A.op_ts >= CURRENT_TIMESTAMP\n        |) AS T\n      ")).stripMargin());
    }

    @Test
    public void testMatchRecognizeOnCdcWithMetaDataSinkWithPk() {
        thrown().expectMessage("Match Recognize doesn't support consuming update and delete changes");
        thrown().expect(TableException.class);
        util().verifyExecPlanInsert(new StringOps(Predef$.MODULE$.augmentString("\n        |insert into sink_with_pk\n        |SELECT T.a, T.b, cast(T.ts as varchar)\n        |FROM cdc_with_meta_and_wm\n        |MATCH_RECOGNIZE (\n        |PARTITION BY c\n        |ORDER BY op_ts\n        |MEASURES\n        |  A.a as a,\n        |  A.b as b,\n        |  A.op_ts as ts,\n        |  MATCH_ROWTIME(op_ts) as matchRowtime\n        |ONE ROW PER MATCH\n        |PATTERN (A)\n        |DEFINE\n        |  A AS A.a > 0\n        |) AS T\n      ")).stripMargin());
    }

    public NonDeterministicDagTest(OptimizerConfigOptions.NonDeterministicUpdateStrategy nonDeterministicUpdateStrategy) {
        this.nonDeterministicUpdateStrategy = nonDeterministicUpdateStrategy;
        OptimizerConfigOptions.NonDeterministicUpdateStrategy nonDeterministicUpdateStrategy2 = OptimizerConfigOptions.NonDeterministicUpdateStrategy.TRY_RESOLVE;
        this.tryResolve = nonDeterministicUpdateStrategy != null ? nonDeterministicUpdateStrategy.equals(nonDeterministicUpdateStrategy2) : nonDeterministicUpdateStrategy2 == null;
    }
}
