package org.apache.flink.table.planner.plan.nodes.exec.serde;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.StateMetadata;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;

@Execution(ExecutionMode.CONCURRENT)
/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/serde/StateMetadataTest.class */
public class StateMetadataTest {
    private TableConfig tableConfig;

    @BeforeEach
    public void beforeEach() {
        this.tableConfig = TableConfig.getDefault();
    }

    @CsvSource({"0,0hour,fooState", "1,600000ms,barState", "2,10minute,meowState"})
    @ParameterizedTest
    public void testStateMetadataSerde(int i, String str, String str2) throws IOException {
        JsonSerdeTestUtil.testJsonRoundTrip(new StateMetadata(i, str, str2), StateMetadata.class);
    }

    @CsvSource(value = {"{\"index\":0,\"name\":\"fooState\"}|state ttl should not be null", "{\"index\":-1,\"ttl\":\"3600000ms\",\"name\":\"barState\"}|state index should start from 0", "{\"ttl\":\"3600000ms\",\"index\":1}|state name should not be null"}, delimiterString = "|")
    @ParameterizedTest
    public void testDeserializeFromMalformedJson(String str, String str2) {
        Assertions.assertThatThrownBy(() -> {
        }).hasMessageContaining(str2);
    }

    @MethodSource({"provideConfigForOneInput"})
    @ParameterizedTest
    public void testGetOneInputOperatorDefaultMeta(Consumer<TableConfig> consumer, String str, long j) {
        consumer.accept(this.tableConfig);
        List oneInputOperatorDefaultMeta = StateMetadata.getOneInputOperatorDefaultMeta(this.tableConfig, str);
        Assertions.assertThat(oneInputOperatorDefaultMeta).hasSize(1);
        Assertions.assertThat(oneInputOperatorDefaultMeta.get(0)).isEqualTo(new StateMetadata(0, Duration.ofMillis(j), str));
    }

    @MethodSource({"provideConfigForMultiInput"})
    @ParameterizedTest
    public void testGetMultiInputOperatorDefaultMeta(Consumer<TableConfig> consumer, List<String> list, List<Long> list2) {
        consumer.accept(this.tableConfig);
        List multiInputOperatorDefaultMeta = StateMetadata.getMultiInputOperatorDefaultMeta(this.tableConfig, (String[]) list.toArray(new String[0]));
        Assertions.assertThat(multiInputOperatorDefaultMeta).hasSameSizeAs(list);
        IntStream.range(0, multiInputOperatorDefaultMeta.size()).forEach(i -> {
            Assertions.assertThat(multiInputOperatorDefaultMeta.get(i)).isEqualTo(new StateMetadata(i, Duration.ofMillis(((Long) list2.get(i)).longValue()), (String) list.get(i)));
        });
    }

    @MethodSource({"provideStateMetaForOneInput"})
    @ParameterizedTest
    public void testGetStateTtlForOneInputOperator(Function<TableConfig, ExecNodeConfig> function, @Nullable List<StateMetadata> list, long j) {
        Assertions.assertThat(StateMetadata.getStateTtlForOneInputOperator(function.apply(this.tableConfig), list)).isEqualTo(j);
    }

    @MethodSource({"provideStateMetaForMultiInput"})
    @ParameterizedTest
    public void testGetStateTtlForMultiInputOperator(Function<TableConfig, ExecNodeConfig> function, @Nullable List<StateMetadata> list, List<Long> list2) {
        Assertions.assertThat(StateMetadata.getStateTtlForMultiInputOperator(function.apply(this.tableConfig), list2.size(), list)).containsExactlyElementsOf(list2);
    }

    @MethodSource({"provideMalformedStateMeta"})
    @ParameterizedTest
    public void testGetStateTtlFromInvalidStateMeta(int i, List<StateMetadata> list, String str) {
        Assertions.assertThatThrownBy(() -> {
            StateMetadata.getStateTtlForMultiInputOperator(ExecNodeConfig.ofTableConfig(this.tableConfig, true), i, list);
        }).isInstanceOf(IllegalArgumentException.class).hasMessageContaining(str);
    }

    public static Stream<Arguments> provideConfigForOneInput() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{tableConfig -> {
        }, "fooState", 0L}), Arguments.of(new Object[]{tableConfig2 -> {
            tableConfig2.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofMinutes(10L));
        }, "barState", 600000L})});
    }

    public static Stream<Arguments> provideConfigForMultiInput() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{tableConfig -> {
        }, Arrays.asList("fooState", "barState"), Stream.generate(() -> {
            return 0L;
        }).limit(2L).collect(Collectors.toList())}), Arguments.of(new Object[]{tableConfig2 -> {
            tableConfig2.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofDays(1L));
        }, Arrays.asList("firstState", "secondState", "thirdState", "fourthState"), Stream.generate(() -> {
            return 86400000L;
        }).limit(4L).collect(Collectors.toList())})});
    }

    public static Stream<Arguments> provideStateMetaForOneInput() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{tableConfig -> {
            return ExecNodeConfig.ofTableConfig(tableConfig, true);
        }, null, 0L}), Arguments.of(new Object[]{tableConfig2 -> {
            tableConfig2.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofDays(1L));
            return ExecNodeConfig.ofTableConfig(tableConfig2, true);
        }, Collections.emptyList(), 86400000L}), Arguments.of(new Object[]{tableConfig3 -> {
            return ExecNodeConfig.ofTableConfig(tableConfig3, true);
        }, Collections.singletonList(new StateMetadata(0, Duration.ofMillis(3600000L), "fooState")), 3600000L}), Arguments.of(new Object[]{tableConfig4 -> {
            tableConfig4.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofDays(1L));
            return ExecNodeConfig.ofTableConfig(tableConfig4, true);
        }, Collections.singletonList(new StateMetadata(0, Duration.ofMillis(172800000L), "barState")), 172800000L})});
    }

    public static Stream<Arguments> provideStateMetaForMultiInput() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{tableConfig -> {
            return ExecNodeConfig.ofTableConfig(tableConfig, true);
        }, null, Stream.generate(() -> {
            return 0L;
        }).limit(2L).collect(Collectors.toList())}), Arguments.of(new Object[]{tableConfig2 -> {
            tableConfig2.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofDays(1L));
            return ExecNodeConfig.ofTableConfig(tableConfig2, true);
        }, Collections.emptyList(), Stream.generate(() -> {
            return 86400000L;
        }).limit(3L).collect(Collectors.toList())}), Arguments.of(new Object[]{tableConfig3 -> {
            return ExecNodeConfig.ofTableConfig(tableConfig3, true);
        }, Arrays.asList(new StateMetadata(1, Duration.ofMillis(86400000L), "fooState"), new StateMetadata(0, Duration.ofMillis(3600000L), "barState")), Arrays.asList(3600000L, 86400000L)}), Arguments.of(new Object[]{tableConfig4 -> {
            tableConfig4.set(ExecutionConfigOptions.IDLE_STATE_RETENTION, Duration.ofMinutes(30L));
            return ExecNodeConfig.ofTableConfig(tableConfig4, true);
        }, Arrays.asList(new StateMetadata(1, Duration.ofMillis(86400000L), "fooState"), new StateMetadata(0, Duration.ofMillis(3600000L), "barState"), new StateMetadata(2, Duration.ofMillis(3600000L), "meowState")), Arrays.asList(3600000L, 86400000L, 3600000L)})});
    }

    public static Stream<Arguments> provideMalformedStateMeta() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{1, Arrays.asList(new StateMetadata(1, Duration.ofMillis(60000L), "fooState"), new StateMetadata(3, Duration.ofMillis(3600000L), "barState")), "Received 2 state meta for a OneInputStreamOperator."}), Arguments.of(new Object[]{2, Collections.singletonList(new StateMetadata(1, Duration.ofMillis(60000L), "fooState")), "Received 1 state meta for a TwoInputStreamOperator."}), Arguments.of(new Object[]{3, Collections.singletonList(new StateMetadata(0, Duration.ofMillis(60000L), "fooState")), "Received 1 state meta for a MultipleInputStreamOperator."}), Arguments.of(new Object[]{2, Arrays.asList(new StateMetadata(0, Duration.ofMillis(60000L), "fooState"), new StateMetadata(0, Duration.ofMillis(3600000L), "barState")), "The state index should not contain duplicates and start from 0 (inclusive) and monotonically increase to the input size (exclusive) of the operator."}), Arguments.of(new Object[]{2, Arrays.asList(new StateMetadata(1, Duration.ofMillis(3600000L), "barState"), new StateMetadata(3, Duration.ofMillis(3600000L), "barState")), "The state index should not contain duplicates and start from 0 (inclusive) and monotonically increase to the input size (exclusive) of the operator."})});
    }
}
