package org.apache.flink.table.catalog;

import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/catalog/SchemaResolutionTest.class */
class SchemaResolutionTest {
    private static final String INVALID_WATERMARK_SQL = "CAST(ts AS TIMESTAMP_LTZ(3)) - INTERVAL '5' SECOND";
    private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> {
        return COMPUTED_SQL;
    });
    private static final ResolvedExpression WATERMARK_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP(3), () -> {
        return WATERMARK_SQL;
    });
    private static final ResolvedExpression INVALID_WATERMARK_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> {
        return INVALID_WATERMARK_SQL;
    });
    private static final ResolvedExpression PROCTIME_RESOLVED = new ResolvedExpressionMock(TypeConversions.fromLogicalToDataType(new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3)), () -> {
        return PROCTIME_SQL;
    });
    private static final String COMPUTED_SQL = "orig_ts - INTERVAL '60' MINUTE";
    private static final String WATERMARK_SQL = "ts - INTERVAL '5' SECOND";
    private static final String PROCTIME_SQL = "PROCTIME()";
    private static final Schema SCHEMA = Schema.newBuilder().primaryKeyNamed("primary_constraint", new String[]{"id"}).column("id", DataTypes.INT().notNull()).withComment("people id").column("counter", DataTypes.INT().notNull()).column("payload", "ROW<name STRING, age INT, flag BOOLEAN>").columnByMetadata("topic", DataTypes.STRING(), true).withComment("kafka topic").columnByExpression("ts", Expressions.callSql(COMPUTED_SQL)).withComment("rowtime").columnByMetadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp").withComment("the 'origin' timestamp").watermark("ts", WATERMARK_SQL).columnByExpression("proctime", PROCTIME_SQL).build();
    private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> {
        return COMPUTED_SQL_WITH_TS_LTZ;
    });
    private static final ResolvedExpression WATERMARK_RESOLVED_WITH_TS_LTZ = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ(3), () -> {
        return WATERMARK_SQL_WITH_TS_LTZ;
    });
    private static final String COMPUTED_SQL_WITH_TS_LTZ = "ts_ltz - INTERVAL '60' MINUTE";
    private static final String WATERMARK_SQL_WITH_TS_LTZ = "ts1 - INTERVAL '5' SECOND";
    private static final Schema SCHEMA_WITH_TS_LTZ = Schema.newBuilder().column("id", DataTypes.INT().notNull()).columnByExpression("ts1", Expressions.callSql(COMPUTED_SQL_WITH_TS_LTZ)).columnByMetadata("ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp").watermark("ts1", WATERMARK_SQL_WITH_TS_LTZ).build();

    SchemaResolutionTest() {
    }

    @Test
    void testSchemaResolution() {
        ResolvedSchema resolvedSchema = new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.INT().notNull()).withComment("people id"), Column.physical("counter", DataTypes.INT().notNull()), Column.physical("payload", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("flag", DataTypes.BOOLEAN())})), Column.metadata("topic", DataTypes.STRING(), (String) null, true).withComment("kafka topic"), Column.computed("ts", COMPUTED_COLUMN_RESOLVED).withComment("rowtime"), Column.metadata("orig_ts", DataTypes.TIMESTAMP(3), "timestamp", false).withComment("the 'origin' timestamp"), Column.computed("proctime", PROCTIME_RESOLVED)), Collections.singletonList(WatermarkSpec.of("ts", WATERMARK_RESOLVED)), UniqueConstraint.primaryKey("primary_constraint", Collections.singletonList("id")));
        ResolvedSchema resolveSchema = resolveSchema(SCHEMA, true);
        Assertions.assertThat(resolveSchema).isEqualTo(resolvedSchema);
        Assertions.assertThat(LogicalTypeChecks.isRowtimeAttribute(getType(resolveSchema, "ts"))).isTrue();
        Assertions.assertThat(LogicalTypeChecks.isProctimeAttribute(getType(resolveSchema, "proctime"))).isTrue();
        ResolvedSchema resolveSchema2 = resolveSchema(SCHEMA, false);
        Assertions.assertThat(resolveSchema2).isEqualTo(resolvedSchema);
        Assertions.assertThat(LogicalTypeChecks.isRowtimeAttribute(getType(resolveSchema2, "ts"))).isFalse();
        Assertions.assertThat(LogicalTypeChecks.isProctimeAttribute(getType(resolveSchema2, "proctime"))).isTrue();
    }

    @Test
    void testSchemaResolutionWithTimestampLtzRowtime() {
        ResolvedSchema resolvedSchema = new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.INT().notNull()), Column.computed("ts1", COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ), Column.metadata("ts_ltz", DataTypes.TIMESTAMP_LTZ(3), "timestamp", false)), Collections.singletonList(WatermarkSpec.of("ts1", WATERMARK_RESOLVED_WITH_TS_LTZ)), (UniqueConstraint) null);
        ResolvedSchema resolveSchema = resolveSchema(SCHEMA_WITH_TS_LTZ, true);
        Assertions.assertThat(resolveSchema).isEqualTo(resolvedSchema);
        Assertions.assertThat(LogicalTypeChecks.isRowtimeAttribute(getType(resolveSchema, "ts1"))).isTrue();
        ResolvedSchema resolveSchema2 = resolveSchema(SCHEMA_WITH_TS_LTZ, false);
        Assertions.assertThat(resolveSchema2).isEqualTo(resolvedSchema);
        Assertions.assertThat(LogicalTypeChecks.isRowtimeAttribute(getType(resolveSchema2, "ts1"))).isFalse();
    }

    @Test
    void testSchemaResolutionWithSourceWatermark() {
        Assertions.assertThat(resolveSchema(Schema.newBuilder().column("ts_ltz", DataTypes.TIMESTAMP_LTZ(1)).watermark("ts_ltz", Expressions.sourceWatermark()).build())).isEqualTo(new ResolvedSchema(Collections.singletonList(Column.physical("ts_ltz", DataTypes.TIMESTAMP_LTZ(1))), Collections.singletonList(WatermarkSpec.of("ts_ltz", CallExpression.permanent(BuiltInFunctionDefinitions.SOURCE_WATERMARK, Collections.emptyList(), DataTypes.TIMESTAMP_LTZ(1)))), (UniqueConstraint) null));
    }

    @Test
    void testSchemaResolutionErrors() {
        testError(Schema.newBuilder().fromSchema(SCHEMA).column("id", DataTypes.STRING()).build(), "Schema must not contain duplicate column names.");
        testError(Schema.newBuilder().columnByExpression("invalid", Expressions.callSql("INVALID")).build(), "Invalid expression for computed column 'invalid'.");
        testError(Schema.newBuilder().columnByMetadata("metadata", DataTypes.INT()).columnByMetadata("from_metadata", DataTypes.BIGINT(), "metadata", false).build(), "The column `metadata` and `from_metadata` in the table are both from the same metadata key 'metadata'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.");
        testError(Schema.newBuilder().columnByMetadata("from_metadata", DataTypes.BIGINT(), "metadata", false).columnByMetadata("from_metadata2", DataTypes.STRING(), "metadata", true).build(), "The column `from_metadata` and `from_metadata2` in the table are both from the same metadata key 'metadata'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.");
        testError(Schema.newBuilder().column("ts", DataTypes.BOOLEAN()).watermark("ts", Expressions.callSql(WATERMARK_SQL)).build(), "Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BOOLEAN");
        testError(Schema.newBuilder().column("ts", DataTypes.TIMESTAMP(3)).watermark("ts", Expressions.callSql("INVALID")).build(), "Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'.");
        testError(Schema.newBuilder().column("ts", DataTypes.TIMESTAMP(3)).watermark("ts", Expressions.callSql(INVALID_WATERMARK_SQL)).build(), "The watermark declaration's output data type 'TIMESTAMP_LTZ(3)' is different from the time field's data type 'TIMESTAMP(3)'.");
        testError(Schema.newBuilder().column("ts", DataTypes.TIMESTAMP(3)).watermark("other_ts", Expressions.callSql(WATERMARK_SQL)).build(), "Invalid column name 'other_ts' for rowtime attribute in watermark declaration. Available columns are: [ts]");
        testError(Schema.newBuilder().fromSchema(SCHEMA).watermark("orig_ts", WATERMARK_SQL).build(), "Multiple watermark definitions are not supported yet.");
        testError(Schema.newBuilder().columnByExpression("ts", PROCTIME_SQL).watermark("ts", WATERMARK_SQL).build(), "A watermark can not be defined for a processing-time attribute.");
        testError(Schema.newBuilder().column("id", DataTypes.INT()).primaryKey(new String[]{"INVALID"}).build(), "Column 'INVALID' does not exist.");
        testError(Schema.newBuilder().column("nullable_col", DataTypes.INT()).primaryKey(new String[]{"nullable_col"}).build(), "Column 'nullable_col' is nullable.");
        testError(Schema.newBuilder().column("orig_ts", DataTypes.TIMESTAMP(3)).columnByExpression("ts", COMPUTED_SQL).primaryKey(new String[]{"ts"}).build(), "Column 'ts' is not a physical column.");
        testError(Schema.newBuilder().column("id", DataTypes.INT()).primaryKey(new String[]{"id", "id"}).build(), "Invalid primary key 'PK_id_id'. A primary key must not contain duplicate columns. Found: [id]");
    }

    @Test
    void testUnresolvedSchemaString() {
        Assertions.assertThat(SCHEMA.toString()).isEqualTo("(\n  `id` INT NOT NULL COMMENT 'people id',\n  `counter` INT NOT NULL,\n  `payload` [ROW<name STRING, age INT, flag BOOLEAN>],\n  `topic` METADATA VIRTUAL COMMENT 'kafka topic',\n  `ts` AS [orig_ts - INTERVAL '60' MINUTE] COMMENT 'rowtime',\n  `orig_ts` METADATA FROM 'timestamp' COMMENT 'the ''origin'' timestamp',\n  `proctime` AS [PROCTIME()],\n  WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND],\n  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n)");
    }

    @Test
    void testResolvedSchemaString() {
        Assertions.assertThat(resolveSchema(SCHEMA).toString()).isEqualTo("(\n  `id` INT NOT NULL COMMENT 'people id',\n  `counter` INT NOT NULL,\n  `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>,\n  `topic` STRING METADATA VIRTUAL COMMENT 'kafka topic',\n  `ts` TIMESTAMP(3) *ROWTIME* AS orig_ts - INTERVAL '60' MINUTE COMMENT 'rowtime',\n  `orig_ts` TIMESTAMP(3) METADATA FROM 'timestamp' COMMENT 'the ''origin'' timestamp',\n  `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),\n  WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND,\n  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n)");
    }

    @Test
    void testGeneratedConstraintName() {
        Assertions.assertThat(((Schema.UnresolvedPrimaryKey) Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()).column("c", DataTypes.STRING()).primaryKey(new String[]{"b", "a"}).build().getPrimaryKey().orElseThrow(IllegalStateException::new)).getConstraintName()).isEqualTo("PK_b_a");
    }

    @Test
    void testSinkRowDataType() {
        Assertions.assertThat(resolveSchema(SCHEMA).toSinkRowDataType()).isEqualTo(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.INT().notNull()), DataTypes.FIELD("counter", DataTypes.INT().notNull()), DataTypes.FIELD("payload", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("flag", DataTypes.BOOLEAN())})), DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3))}).notNull());
    }

    @Test
    void testPhysicalRowDataType() {
        ResolvedSchema resolveSchema = resolveSchema(SCHEMA);
        DataType notNull = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.INT().notNull()), DataTypes.FIELD("counter", DataTypes.INT().notNull()), DataTypes.FIELD("payload", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("flag", DataTypes.BOOLEAN())}))}).notNull();
        DataType physicalRowDataType = resolveSchema.toPhysicalRowDataType();
        Assertions.assertThat(physicalRowDataType).isEqualTo(notNull);
        Assertions.assertThat(resolveSchema(Schema.newBuilder().fromRowDataType(physicalRowDataType).build()).toPhysicalRowDataType()).isEqualTo(physicalRowDataType);
    }

    @Test
    void testSourceRowDataType() {
        ResolvedSchema resolveSchema = resolveSchema(SCHEMA);
        DataType notNull = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.INT().notNull()), DataTypes.FIELD("counter", DataTypes.INT().notNull()), DataTypes.FIELD("payload", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("age", DataTypes.INT()), DataTypes.FIELD("flag", DataTypes.BOOLEAN())})), DataTypes.FIELD("topic", DataTypes.STRING()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("orig_ts", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("proctime", DataTypes.TIMESTAMP_LTZ(3).notNull())}).notNull();
        DataType sourceRowDataType = resolveSchema.toSourceRowDataType();
        Assertions.assertThat(sourceRowDataType).isEqualTo(notNull);
        Assertions.assertThat(LogicalTypeChecks.isTimeAttribute(((DataType) sourceRowDataType.getChildren().get(4)).getLogicalType())).isFalse();
        Assertions.assertThat(LogicalTypeChecks.isTimeAttribute(((DataType) sourceRowDataType.getChildren().get(6)).getLogicalType())).isFalse();
    }

    private static void testError(Schema schema, String str) {
        testError(schema, str, true);
    }

    private static void testError(Schema schema, String str, boolean z) {
        Assertions.assertThatThrownBy(() -> {
            resolveSchema(schema, z);
        }).hasMessageContaining(str);
    }

    private static ResolvedSchema resolveSchema(Schema schema) {
        return resolveSchema(schema, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ResolvedSchema resolveSchema(Schema schema, boolean z) {
        return new DefaultSchemaResolver(z, new DataTypeFactoryMock(), ExpressionResolverMocks.forSqlExpression(SchemaResolutionTest::resolveSqlExpression)).resolve(schema);
    }

    private static ResolvedExpression resolveSqlExpression(String str, RowType rowType, @Nullable LogicalType logicalType) {
        boolean z = -1;
        switch (str.hashCode()) {
            case -290388684:
                if (str.equals(COMPUTED_SQL_WITH_TS_LTZ)) {
                    z = true;
                    break;
                }
                break;
            case 543204149:
                if (str.equals(WATERMARK_SQL_WITH_TS_LTZ)) {
                    z = 3;
                    break;
                }
                break;
            case 913772609:
                if (str.equals(INVALID_WATERMARK_SQL)) {
                    z = 5;
                    break;
                }
                break;
            case 1295630793:
                if (str.equals(COMPUTED_SQL)) {
                    z = false;
                    break;
                }
                break;
            case 1327305572:
                if (str.equals(PROCTIME_SQL)) {
                    z = 4;
                    break;
                }
                break;
            case 1464389730:
                if (str.equals(WATERMARK_SQL)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                Assertions.assertThat(getType(rowType, "orig_ts")).isEqualTo(DataTypes.TIMESTAMP(3).getLogicalType());
                return COMPUTED_COLUMN_RESOLVED;
            case true:
                Assertions.assertThat(getType(rowType, "ts_ltz")).isEqualTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType());
                return COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ;
            case true:
                Assertions.assertThat(getType(rowType, "ts")).isEqualTo(DataTypes.TIMESTAMP(3).getLogicalType());
                return WATERMARK_RESOLVED;
            case true:
                Assertions.assertThat(getType(rowType, "ts1")).isEqualTo(DataTypes.TIMESTAMP_LTZ(3).getLogicalType());
                return WATERMARK_RESOLVED_WITH_TS_LTZ;
            case true:
                return PROCTIME_RESOLVED;
            case true:
                return INVALID_WATERMARK_RESOLVED;
            default:
                throw new UnsupportedOperationException("Unknown SQL expression.");
        }
    }

    private static LogicalType getType(ResolvedSchema resolvedSchema, String str) {
        return ((Column) resolvedSchema.getColumn(str).orElseThrow(IllegalStateException::new)).getDataType().getLogicalType();
    }

    private static LogicalType getType(RowType rowType, String str) {
        return rowType.getTypeAt(rowType.getFieldIndex(str));
    }
}
