package org.apache.flink.streaming.util;

import java.time.Duration;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest.class */
class AbstractStreamOperatorTestHarnessTest {

    /* loaded from: input_file:org/apache/flink/streaming/util/AbstractStreamOperatorTestHarnessTest$SideOutputTypeInformationTestFunction.class */
    private static class SideOutputTypeInformationTestFunction extends ProcessFunction<Integer, Integer> {
        private final OutputTag<Integer> outputTag;

        SideOutputTypeInformationTestFunction(OutputTag<Integer> outputTag) {
            this.outputTag = outputTag;
        }

        public void processElement(Integer num, ProcessFunction<Integer, Integer>.Context context, Collector<Integer> collector) throws Exception {
            context.output(this.outputTag, num);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, ProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Integer) obj, (ProcessFunction<Integer, Integer>.Context) context, (Collector<Integer>) collector);
        }
    }

    AbstractStreamOperatorTestHarnessTest() {
    }

    @Test
    void testInitializeAfterOpenning() throws Throwable {
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) new AbstractStreamOperator<Integer>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarnessTest.1
        }, 1, 1, 0);
        abstractStreamOperatorTestHarness.setup();
        abstractStreamOperatorTestHarness.open();
        Assertions.assertThatThrownBy(() -> {
            abstractStreamOperatorTestHarness.initializeState(OperatorSubtaskState.builder().build());
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("TestHarness has already been initialized.");
    }

    @Test
    void testSetTtlTimeProvider() throws Exception {
        AbstractStreamOperator<Integer> abstractStreamOperator = new AbstractStreamOperator<Integer>() { // from class: org.apache.flink.streaming.util.AbstractStreamOperatorTestHarnessTest.2
        };
        AbstractStreamOperatorTestHarness abstractStreamOperatorTestHarness = new AbstractStreamOperatorTestHarness((StreamOperator) abstractStreamOperator, 1, 1, 0);
        try {
            abstractStreamOperatorTestHarness.config.setStateKeySerializer(IntSerializer.INSTANCE);
            abstractStreamOperatorTestHarness.config.serializeAllConfigs();
            Duration ofHours = Duration.ofHours(1L);
            abstractStreamOperatorTestHarness.initializeState(OperatorSubtaskState.builder().build());
            abstractStreamOperatorTestHarness.open();
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("test", IntSerializer.INSTANCE);
            valueStateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(ofHours).build());
            KeyedStateBackend keyedStateBackend = abstractStreamOperator.getKeyedStateBackend();
            ValueState partitionedState = keyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, valueStateDescriptor);
            keyedStateBackend.setCurrentKey(1);
            abstractStreamOperatorTestHarness.setStateTtlProcessingTime(0L);
            partitionedState.update(42);
            Assertions.assertThat((Integer) partitionedState.value()).isEqualTo(42);
            abstractStreamOperatorTestHarness.setStateTtlProcessingTime(ofHours.toMillis() + 1);
            Assertions.assertThat((Integer) partitionedState.value()).isNull();
            abstractStreamOperatorTestHarness.close();
        } catch (Throwable th) {
            try {
                abstractStreamOperatorTestHarness.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    void testSideOutputTypeInformation() throws Throwable {
        TypeSerializer typeSerializer = (TypeSerializer) Mockito.spy(TypeSerializer.class);
        TypeInformation typeInformation = (TypeInformation) Mockito.spy(Types.INT);
        Mockito.when(typeInformation.createSerializer((SerializerConfig) ArgumentMatchers.any(SerializerConfigImpl.class))).thenReturn(typeSerializer);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new ProcessOperator(new SideOutputTypeInformationTestFunction(new OutputTag("test", typeInformation))));
        oneInputStreamOperatorTestHarness.setup();
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(12, 1000L);
        ((TypeSerializer) Mockito.verify(typeSerializer, Mockito.times(1))).copy(Integer.valueOf(ArgumentMatchers.eq(12)));
    }
}
