package org.apache.flink.table.planner.runtime.stream.jsonplan;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.planner.utils.JsonPlanTestBase;
import org.apache.flink.table.planner.utils.JsonTestUtils;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/stream/jsonplan/ConfigureOperatorLevelStateTtlJsonITCase.class */
public class ConfigureOperatorLevelStateTtlJsonITCase extends JsonPlanTestBase {
    @Test
    public void testDifferentStateTtlForDifferentOneInputOperator() throws Exception {
        createTestSourceTable("Orders", new String[]{"`order_id` INT", "`buyer` STRING", "`quantity` INT", "`amount` DOUBLE"}, null, getProperties(TestValuesTableFactory.registerRowData(Arrays.asList(GenericRowData.of(new Object[]{1, StringData.fromString("Tom"), 1, Double.valueOf(199.9d)}), GenericRowData.of(new Object[]{2, StringData.fromString("Jerry"), 2, Double.valueOf(99.9d)}), GenericRowData.of(new Object[]{1, StringData.fromString("Tom"), 1, Double.valueOf(199.9d)}), GenericRowData.of(new Object[]{3, StringData.fromString("Tom"), 1, Double.valueOf(29.9d)}), GenericRowData.of(new Object[]{4, StringData.fromString("Olivia"), 1, Double.valueOf(100.0d)}), GenericRowData.of(new Object[]{4, StringData.fromString("Olivia"), 1, Double.valueOf(100.0d)}), GenericRowData.of(new Object[]{2, StringData.fromString("Jerry"), 2, Double.valueOf(99.9d)}), GenericRowData.of(new Object[]{5, StringData.fromString("Michael"), 3, Double.valueOf(599.9d)}), GenericRowData.of(new Object[]{6, StringData.fromString("Olivia"), 3, Double.valueOf(1000.0d)}))), 1, "2s"));
        createTestNonInsertOnlyValuesSinkTable("OrdersStats", "`buyer` STRING", "`ord_cnt` BIGINT", "`quantity_cnt` BIGINT", "`total_amount` DOUBLE");
        compileSqlAndExecutePlan("INSERT INTO OrdersStats \nSELECT buyer, COUNT(1) AS ord_cnt, SUM(quantity) AS quantity_cnt, SUM(amount) AS total_amount FROM (\nSELECT *, ROW_NUMBER() OVER(PARTITION BY order_id, buyer, quantity, amount ORDER BY proctime() ASC) AS rk FROM Orders) tmp\nWHERE rk = 1\nGROUP BY buyer", str -> {
            try {
                JsonNode readFromString = JsonTestUtils.readFromString(str);
                JsonTestUtils.setExecNodeStateMetadata(readFromString, "stream-exec-deduplicate", 0, 6000L);
                JsonTestUtils.setExecNodeStateMetadata(readFromString, "stream-exec-group-aggregate", 0, 9000L);
                return JsonTestUtils.writeToString(readFromString);
            } catch (IOException e) {
                throw new TableException("Cannot modify compiled json plan.", e);
            }
        }).await();
        assertResult(Arrays.asList("+I[Tom, 2, 2, 229.8]", "+I[Jerry, 1, 2, 99.9]", "+I[Jerry, 1, 2, 99.9]", "+I[Olivia, 2, 4, 1100.0]", "+I[Michael, 1, 3, 599.9]"), TestValuesTableFactory.getResults("OrdersStats"));
    }

    @Test
    public void testDifferentStateTtlForSameTwoInputStreamOperator() throws Exception {
        createTestSourceTable("Orders", new String[]{"`order_id` INT", "`line_order_id` INT"}, null, getProperties(TestValuesTableFactory.registerRowData(Arrays.asList(GenericRowData.of(new Object[]{1, 1000001}), GenericRowData.of(new Object[]{1, 1000002}), GenericRowData.of(new Object[]{1, 1000003}), GenericRowData.of(new Object[]{1, 1000004}), GenericRowData.of(new Object[]{1, 1000005}), GenericRowData.of(new Object[]{2, 2000001}))), 1, "2s"));
        createTestSourceTable("LineOrders", new String[]{"`line_order_id` INT", "`ship_mode` STRING"}, null, getProperties(TestValuesTableFactory.registerRowData(Arrays.asList(GenericRowData.of(new Object[]{2000001, StringData.fromString("TRUCK")}), GenericRowData.of(new Object[]{1000005, StringData.fromString("AIR")}), GenericRowData.of(new Object[]{1000001, StringData.fromString("SHIP")}), GenericRowData.of(new Object[]{1000002, StringData.fromString("TRUCK")}), GenericRowData.of(new Object[]{1000003, StringData.fromString("RAIL")}), GenericRowData.of(new Object[]{1000004, StringData.fromString("RAIL")}))), 2, "4s"));
        createTestValuesSinkTable("OrdersShipInfo", "`order_id` INT", "`line_order_id` INT", "`ship_mode` STRING");
        compileSqlAndExecutePlan("INSERT INTO OrdersShipInfo \nSELECT a.order_id, a.line_order_id, b.ship_mode FROM Orders a JOIN LineOrders b ON a.line_order_id = b.line_order_id", str -> {
            try {
                JsonNode readFromString = JsonTestUtils.readFromString(str);
                JsonTestUtils.setExecNodeStateMetadata(readFromString, "stream-exec-join", 0, 3000L);
                JsonTestUtils.setExecNodeStateMetadata(readFromString, "stream-exec-join", 1, 9000L);
                return JsonTestUtils.writeToString(readFromString);
            } catch (IOException e) {
                throw new TableException("Cannot modify compiled json plan.", e);
            }
        }).await();
        assertResult(Arrays.asList("+I[1, 1000002, TRUCK]", "+I[1, 1000004, RAIL]", "+I[1, 1000005, AIR]"), TestValuesTableFactory.getResults("OrdersShipInfo"));
    }

    private static Map<String, String> getProperties(final String str, final int i, final String str2) {
        return new HashMap<String, String>() { // from class: org.apache.flink.table.planner.runtime.stream.jsonplan.ConfigureOperatorLevelStateTtlJsonITCase.1
            {
                put("connector", TestValuesTableFactory.IDENTIFIER);
                put("bounded", "false");
                put("register-internal-data", "true");
                put("source.sleep-after-elements", String.valueOf(i));
                put("source.sleep-time", str2);
                put("data-id", str);
            }
        };
    }
}
