package org.apache.flink.table.planner.plan.nodes.exec.common;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.legacy.table.connector.source.SourceFunctionProvider;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.factories.TableFactoryHarness;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase.class */
public class CommonExecSinkITCase {
    private static final int PARALLELISM = 4;

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private StreamExecutionEnvironment env;

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$RecordWriter.class */
    private static class RecordWriter extends TestSinkV2.DefaultSinkWriter<RowData> {
        private final SharedReference<List<RowData>> rows;

        private RecordWriter(SharedReference<List<RowData>> sharedReference) {
            this.rows = sharedReference;
        }

        public void write(RowData rowData, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.rows, rowData);
            super.write(rowData, context);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$TestSource.class */
    private static class TestSource extends TableFactoryHarness.ScanSourceBase {
        private final List<Row> rows;

        private TestSource(List<Row> list) {
            super(false);
            this.rows = list;
        }

        @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.ScanSourceBase
        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext scanContext) {
            return SourceFunctionProvider.of(new TestSourceFunction(this.rows, scanContext.createDataStructureConverter(getFactoryContext().getPhysicalRowDataType())), false);
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$TestSourceFunction.class */
    private static class TestSourceFunction implements SourceFunction<RowData> {
        private final List<Row> rows;
        private final DynamicTableSource.DataStructureConverter converter;

        public TestSourceFunction(List<Row> list, DynamicTableSource.DataStructureConverter dataStructureConverter) {
            this.rows = list;
            this.converter = dataStructureConverter;
        }

        public void run(SourceFunction.SourceContext<RowData> sourceContext) throws Exception {
            Stream<R> map = this.rows.stream().map(row -> {
                return (RowData) this.converter.toInternal(row);
            });
            Objects.requireNonNull(sourceContext);
            map.forEach((v1) -> {
                r1.collect(v1);
            });
        }

        public void cancel() {
        }
    }

    /* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSinkITCase$TestTimestampWriter.class */
    private static class TestTimestampWriter extends TestSinkV2.DefaultSinkWriter<RowData> {
        private final SharedReference<List<Long>> timestamps;

        private TestTimestampWriter(SharedReference<List<Long>> sharedReference) {
            this.timestamps = sharedReference;
        }

        public void write(RowData rowData, SinkWriter.Context context) {
            CommonExecSinkITCase.addElement(this.timestamps, context.timestamp());
            super.write(rowData, context);
        }
    }

    CommonExecSinkITCase() {
    }

    @BeforeEach
    void before() {
        this.env = StreamExecutionEnvironment.getExecutionEnvironment();
        this.env.setParallelism(PARALLELISM);
    }

    @Test
    void testStreamRecordTimestampInserterSinkRuntimeProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        SharedReference add = this.sharedObjects.add(new ArrayList());
        List asList = Arrays.asList(Row.of(new Object[]{1, "foo", Instant.parse("2020-11-10T12:34:56.123Z")}), Row.of(new Object[]{2, "foo", Instant.parse("2020-11-10T11:34:56.789Z")}), Row.of(new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(schemaStreamRecordTimestampInserter(true)).source(new TestSource(asList)).sink(buildRuntimeSinkProvider(new TestTimestampWriter(add))).build());
        assertPlan(create, "INSERT INTO T1 SELECT * FROM T1", true);
        create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        assertTimestampResults(add, asList);
    }

    @Test
    void testStreamRecordTimestampInserterDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        List asList = Arrays.asList(Row.of(new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of(new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of(new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), "foo", Instant.parse("2020-11-11T10:11:23.888Z")}));
        final SinkFunction<RowData> sinkFunction = new SinkFunction<RowData>() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.1
            public void invoke(RowData rowData, SinkFunction.Context context) {
                CommonExecSinkITCase.addElement(add, context.timestamp());
            }
        };
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(schemaStreamRecordTimestampInserter(true)).source(new TestSource(asList)).sink(new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.2
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider, reason: merged with bridge method [inline-methods] */
            public DataStreamSinkProvider mo846getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return new DataStreamSinkProvider() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.2.1
                    public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                        return dataStream.addSink(sinkFunction);
                    }
                };
            }
        }).build());
        assertPlan(create, "INSERT INTO T1 SELECT * FROM T1", true);
        create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        Collections.sort((List) add.get());
        assertTimestampResults(add, asList);
    }

    @Test
    void testStreamRecordTimestampInserterNotApplied() {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(schemaStreamRecordTimestampInserter(false)).source(new TestSource(Arrays.asList(Row.of(new Object[]{1, "foo", Instant.parse("2020-11-10T11:34:56.123Z")}), Row.of(new Object[]{2, "foo", Instant.parse("2020-11-10T12:34:56.789Z")}), Row.of(new Object[]{3, "foo", Instant.parse("2020-11-11T10:11:22.777Z")}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), "foo", Instant.parse("2020-11-11T10:11:23.888Z")})))).sink(buildRuntimeSinkProvider(new TestTimestampWriter(this.sharedObjects.add(new ArrayList())))).build());
        assertPlan(create, "INSERT INTO T1 SELECT * FROM T1", false);
    }

    @Test
    void testUnifiedSinksAreUsableWithDataStreamSinkProvider() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        SharedReference<List<RowData>> add = this.sharedObjects.add(new ArrayList());
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(Schema.newBuilder().column("a", DataTypes.INT()).build()).source(new TestSource(Arrays.asList(Row.of(new Object[]{1}), Row.of(new Object[]{2})))).sink(buildDataStreamSinkProvider(add)).build());
        create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        List list = (List) ((List) add.get()).stream().map(rowData -> {
            return Integer.valueOf(rowData.getInt(0));
        }).sorted().collect(Collectors.toList());
        Assertions.assertThat(((Integer) list.get(0)).intValue()).isEqualTo(1);
        Assertions.assertThat(((Integer) list.get(1)).intValue()).isEqualTo(2);
    }

    @Test
    void testCharLengthEnforcer() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(Row.of(new Object[]{1, "Apache Flink", "SQL RuleZ", 11, 111, "SQL"}), Row.of(new Object[]{2, "Apache", "SQL", 22, 222, "Flink"}), Row.of(new Object[]{3, "Apache", "Flink", 33, 333, "Apache Flink SQL"}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), "Flink Project", "SQL or SeQueL?", 44, 444, "Apache Flink SQL"}));
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(schemaForCharLengthEnforcer()).source(new TestSource(asList)).build());
        TableResult executeSql = create.executeSql("SELECT * FROM T1");
        executeSql.await();
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = executeSql.collect();
        Objects.requireNonNull(arrayList);
        collect.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(asList);
        try {
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
            TableResult executeSql2 = create.executeSql("SELECT * FROM T1");
            executeSql2.await();
            List asList2 = Arrays.asList(Row.of(new Object[]{1, "Apache F", "SQL Ru", 11, 111, "SQL"}), Row.of(new Object[]{2, "Apache  ", "SQL   ", 22, 222, "Flink"}), Row.of(new Object[]{3, "Apache  ", "Flink ", 33, 333, "Apache"}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), "Flink Pr", "SQL or", 44, 444, "Apache"}));
            ArrayList arrayList2 = new ArrayList();
            CloseableIterator collect2 = executeSql2.collect();
            Objects.requireNonNull(arrayList2);
            collect2.forEachRemaining((v1) -> {
                r1.add(v1);
            });
            Assertions.assertThat(arrayList2).containsExactlyInAnyOrderElementsOf(asList2);
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
        } catch (Throwable th) {
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
            throw th;
        }
    }

    @Test
    void testBinaryLengthEnforcer() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(Row.of(new Object[]{1, new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8, 9, 10, 11, 12}, new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8}, 11, 111, new byte[]{1, 2, 3}}), Row.of(new Object[]{2, new byte[]{1, 2, 3, PARALLELISM, 5}, new byte[]{1, 2, 3}, 22, 222, new byte[]{1, 2, 3, PARALLELISM, 5, 6}}), Row.of(new Object[]{3, new byte[]{1, 2, 3, PARALLELISM, 5, 6}, new byte[]{1, 2, 3, PARALLELISM, 5}, 33, 333, new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8}}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8}, new byte[]{1, 2, 3, PARALLELISM, 5, 6}, 44, 444, new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8, 9, 10}}));
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(schemaForBinaryLengthEnforcer()).source(new TestSource(asList)).build());
        TableResult executeSql = create.executeSql("SELECT * FROM T1");
        executeSql.await();
        ArrayList arrayList = new ArrayList();
        CloseableIterator collect = executeSql.collect();
        Objects.requireNonNull(arrayList);
        collect.forEachRemaining((v1) -> {
            r1.add(v1);
        });
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(asList);
        try {
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.TRIM_PAD.name());
            TableResult executeSql2 = create.executeSql("SELECT * FROM T1");
            executeSql2.await();
            List asList2 = Arrays.asList(Row.of(new Object[]{1, new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8}, new byte[]{1, 2, 3, PARALLELISM, 5, 6}, 11, 111, new byte[]{1, 2, 3}}), Row.of(new Object[]{2, new byte[]{1, 2, 3, PARALLELISM, 5, 0, 0, 0}, new byte[]{1, 2, 3, 0, 0, 0}, 22, 222, new byte[]{1, 2, 3, PARALLELISM, 5, 6}}), Row.of(new Object[]{3, new byte[]{1, 2, 3, PARALLELISM, 5, 6, 0, 0}, new byte[]{1, 2, 3, PARALLELISM, 5, 0}, 33, 333, new byte[]{1, 2, 3, PARALLELISM, 5, 6}}), Row.of(new Object[]{Integer.valueOf(PARALLELISM), new byte[]{1, 2, 3, PARALLELISM, 5, 6, 7, 8}, new byte[]{1, 2, 3, PARALLELISM, 5, 6}, 44, 444, new byte[]{1, 2, 3, PARALLELISM, 5, 6}}));
            ArrayList arrayList2 = new ArrayList();
            CloseableIterator collect2 = executeSql2.collect();
            Objects.requireNonNull(arrayList2);
            collect2.forEachRemaining((v1) -> {
                r1.add(v1);
            });
            Assertions.assertThat(arrayList2).containsExactlyInAnyOrderElementsOf(asList2);
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
        } catch (Throwable th) {
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER.key(), ExecutionConfigOptions.TypeLengthEnforcer.IGNORE.name());
            throw th;
        }
    }

    @Test
    void testNullEnforcer() throws ExecutionException, InterruptedException {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        List asList = Arrays.asList(Row.of(new Object[]{1, "Apache", 11}), Row.of(new Object[]{2, null, 22}), Row.of(new Object[]{null, "Flink", 33}), Row.of(new Object[]{null, null, 44}));
        SharedReference add = this.sharedObjects.add(new ArrayList());
        create.createTable("T1", TableFactoryHarness.newBuilder().m112schema(schemaForNotNullEnforcer()).source(new TestSource(asList)).sink(buildRuntimeSinkProvider(new RecordWriter(add))).build());
        Assertions.assertThatThrownBy(() -> {
            create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
        }).isInstanceOf(ExecutionException.class).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches("Column 'b' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='DROP' to suppress this exception and drop such records silently.")});
        ((List) add.get()).clear();
        Assertions.assertThatThrownBy(() -> {
            create.executeSql("INSERT INTO T1(a, b) SELECT (a, b) FROM T1").await();
        }).isInstanceOf(ValidationException.class).hasMessage("SQL validation failed. At line 0, column 0: Column 'c' has no default value and does not allow NULLs");
        try {
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(), ExecutionConfigOptions.NotNullEnforcer.DROP.name());
            ((List) add.get()).clear();
            create.executeSql("INSERT INTO T1 SELECT * FROM T1").await();
            Assertions.assertThat(((List) add.get()).size()).isEqualTo(2);
            Assertions.assertThat(((RowData) ((List) add.get()).get(0)).getInt(0)).isEqualTo(1);
            Assertions.assertThat(((RowData) ((List) add.get()).get(0)).getString(1).toString()).isEqualTo("Apache");
            Assertions.assertThat(((RowData) ((List) add.get()).get(0)).getInt(2)).isEqualTo(11);
            Assertions.assertThat(((RowData) ((List) add.get()).get(1)).isNullAt(0)).isTrue();
            Assertions.assertThat(((RowData) ((List) add.get()).get(1)).getString(1).toString()).isEqualTo("Flink");
            Assertions.assertThat(((RowData) ((List) add.get()).get(1)).getInt(2)).isEqualTo(33);
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(), ExecutionConfigOptions.NotNullEnforcer.ERROR.name());
        } catch (Throwable th) {
            create.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_SINK_NOT_NULL_ENFORCER.key(), ExecutionConfigOptions.NotNullEnforcer.ERROR.name());
            throw th;
        }
    }

    @Test
    void testFromValuesWatermarkPropagation() throws Exception {
        StreamTableEnvironment create = StreamTableEnvironment.create(this.env);
        final SharedReference add = this.sharedObjects.add(new ArrayList());
        final SinkFunction<RowData> sinkFunction = new SinkFunction<RowData>() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.3
            public void writeWatermark(Watermark watermark) {
                CommonExecSinkITCase.addElement(add, Long.valueOf(watermark.getTimestamp()));
            }
        };
        create.fromValues(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("a", DataTypes.INT())}), new Object[]{Row.of(new Object[]{1}), Row.of(new Object[]{2}), Row.of(new Object[]{3})}).executeInsert(TableFactoryHarness.newBuilder().sink(new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.4
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider, reason: merged with bridge method [inline-methods] */
            public DataStreamSinkProvider mo846getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return new DataStreamSinkProvider() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.4.1
                    public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                        return dataStream.addSink(sinkFunction);
                    }
                };
            }
        }).build()).await();
        Assertions.assertThat(((List) add.get()).size()).isEqualTo(this.env.getParallelism());
        Iterator it = ((List) add.get()).iterator();
        while (it.hasNext()) {
            Assertions.assertThat((Long) it.next()).isEqualTo(org.apache.flink.streaming.api.watermark.Watermark.MAX_WATERMARK.getTimestamp());
        }
    }

    private static <T> void addElement(SharedReference<List<T>> sharedReference, T t) {
        sharedReference.applySync(list -> {
            return Boolean.valueOf(list.add(t));
        });
    }

    private static TestSinkV2<RowData> buildRecordWriterTestSink(TestSinkV2.DefaultSinkWriter<RowData> defaultSinkWriter) {
        return TestSinkV2.newBuilder().setWriter(defaultSinkWriter).build();
    }

    private TableFactoryHarness.SinkBase buildRuntimeSinkProvider(final TestSinkV2.DefaultSinkWriter<RowData> defaultSinkWriter) {
        return new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.5
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider */
            public DynamicTableSink.SinkRuntimeProvider mo846getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return SinkV2Provider.of(CommonExecSinkITCase.buildRecordWriterTestSink(defaultSinkWriter));
            }
        };
    }

    @Nullable
    private TableFactoryHarness.SinkBase buildDataStreamSinkProvider(final SharedReference<List<RowData>> sharedReference) {
        return new TableFactoryHarness.SinkBase() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.6
            @Override // org.apache.flink.table.planner.factories.TableFactoryHarness.SinkBase
            /* renamed from: getSinkRuntimeProvider, reason: merged with bridge method [inline-methods] */
            public DataStreamSinkProvider mo846getSinkRuntimeProvider(DynamicTableSink.Context context) {
                return new DataStreamSinkProvider() { // from class: org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSinkITCase.6.1
                    public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
                        return dataStream.sinkTo(CommonExecSinkITCase.buildRecordWriterTestSink(new RecordWriter(sharedReference)));
                    }
                };
            }
        };
    }

    private static void assertPlan(StreamTableEnvironment streamTableEnvironment, String str, boolean z) {
        String explainSql = streamTableEnvironment.explainSql(str, new ExplainDetail[]{ExplainDetail.JSON_EXECUTION_PLAN});
        if (z) {
            Assertions.assertThat(explainSql).contains(new CharSequence[]{"StreamRecordTimestampInserter(rowtime field: 2"});
        } else {
            Assertions.assertThat(explainSql).doesNotContain(new CharSequence[]{"StreamRecordTimestampInserter(rowtime field: 2"});
        }
    }

    private static Schema schemaStreamRecordTimestampInserter(boolean z) {
        Schema.Builder column = Schema.newBuilder().column("a", "INT").column("b", "STRING").column("ts", "TIMESTAMP_LTZ(3)");
        if (z) {
            column.watermark("ts", "ts");
        }
        return column.build();
    }

    private static Schema schemaForCharLengthEnforcer() {
        return Schema.newBuilder().column("a", "INT").column("b", "CHAR(8)").column("c", "CHAR(6)").column("d", "INT").column("e", "INT").column("f", "VARCHAR(6)").build();
    }

    private static Schema schemaForBinaryLengthEnforcer() {
        return Schema.newBuilder().column("a", "INT").column("b", "BINARY(8)").column("c", "BINARY(6)").column("d", "INT").column("e", "INT").column("f", "VARBINARY(6)").build();
    }

    private static Schema schemaForNotNullEnforcer() {
        return Schema.newBuilder().column("a", "INT").column("b", "STRING NOT NULL").column("c", "INT NOT NULL").build();
    }

    private static void assertTimestampResults(SharedReference<List<Long>> sharedReference, List<Row> list) {
        Assertions.assertThat((List) sharedReference.get()).hasSize(list.size());
        for (int i = 0; i < list.size(); i++) {
            Assertions.assertThat(Instant.ofEpochMilli(((Long) ((List) sharedReference.get()).get(i)).longValue())).isEqualTo(list.get(i).getField(2));
        }
    }
}
