package org.apache.flink.streaming.api.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.SerializerFactory;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.DefaultKeyedStateStoreV2;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.util.CollectorOutput;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamingRuntimeContextTest.class */
class StreamingRuntimeContextTest {
    StreamingRuntimeContextTest() {
    }

    @Test
    void testValueStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.getSerializerConfig().registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getState(new ValueStateDescriptor("name", TaskInfo.class));
        KryoSerializer serializer = ((StateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testReducingStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.getSerializerConfig().registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getReducingState(new ReducingStateDescriptor("name", (ReduceFunction) Mockito.mock(ReduceFunction.class), TaskInfo.class));
        KryoSerializer serializer = ((StateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testAggregatingStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.getSerializerConfig().registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getAggregatingState(new AggregatingStateDescriptor("name", (AggregateFunction) Mockito.mock(AggregateFunction.class), TaskInfo.class));
        KryoSerializer serializer = ((AggregatingStateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testListStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.getSerializerConfig().registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getListState(new ListStateDescriptor("name", TaskInfo.class));
        ListStateDescriptor listStateDescriptor = (ListStateDescriptor) atomicReference.get();
        Assertions.assertThat(listStateDescriptor.getSerializer()).isInstanceOf(ListSerializer.class);
        KryoSerializer elementSerializer = listStateDescriptor.getElementSerializer();
        Assertions.assertThat(elementSerializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(elementSerializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testListStateReturnsEmptyListByDefault() throws Exception {
        Iterable iterable = (Iterable) createRuntimeContext().getListState(new ListStateDescriptor("name", String.class)).get();
        Assertions.assertThat(iterable).isNotNull();
        Assertions.assertThat(iterable.iterator()).isExhausted();
    }

    @Test
    void testMapStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.getSerializerConfig().registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getMapState(new MapStateDescriptor("name", String.class, TaskInfo.class));
        KryoSerializer valueSerializer = ((MapStateDescriptor) atomicReference.get()).getValueSerializer();
        Assertions.assertThat(valueSerializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(valueSerializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testMapStateReturnsEmptyMapByDefault() throws Exception {
        Iterable entries = createMapOperatorRuntimeContext().getMapState(new MapStateDescriptor("name", Integer.class, String.class)).entries();
        Assertions.assertThat(entries).isNotNull();
        Assertions.assertThat(entries.iterator()).isExhausted();
    }

    @Test
    void testV2ValueStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        SerializerConfig serializerConfig = executionConfig.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getValueState(new org.apache.flink.runtime.state.v2.ValueStateDescriptor("name", TypeInformation.of(TaskInfo.class), serializerConfig));
        KryoSerializer serializer = ((org.apache.flink.runtime.state.v2.ValueStateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2ListStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        SerializerConfig serializerConfig = executionConfig.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getListState(new org.apache.flink.runtime.state.v2.ListStateDescriptor("name", TypeInformation.of(TaskInfo.class), serializerConfig));
        KryoSerializer serializer = ((org.apache.flink.runtime.state.v2.ListStateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2MapStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        SerializerConfig serializerConfig = executionConfig.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getMapState(new org.apache.flink.runtime.state.v2.MapStateDescriptor("name", TypeInformation.of(String.class), TypeInformation.of(TaskInfo.class), serializerConfig));
        KryoSerializer serializer = ((org.apache.flink.runtime.state.v2.MapStateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2ReducingStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        SerializerConfig serializerConfig = executionConfig.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getReducingState(new org.apache.flink.runtime.state.v2.ReducingStateDescriptor("name", (ReduceFunction) Mockito.mock(ReduceFunction.class), TypeInformation.of(TaskInfo.class), serializerConfig));
        KryoSerializer serializer = ((org.apache.flink.runtime.state.v2.ReducingStateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    @Test
    void testV2AggregatingStateInstantiation() throws Exception {
        ExecutionConfig executionConfig = new ExecutionConfig();
        SerializerConfig serializerConfig = executionConfig.getSerializerConfig();
        serializerConfig.registerKryoType(Path.class);
        AtomicReference<Object> atomicReference = new AtomicReference<>();
        createRuntimeContext(atomicReference, executionConfig).getAggregatingState(new org.apache.flink.runtime.state.v2.AggregatingStateDescriptor("name", (AggregateFunction) Mockito.mock(AggregateFunction.class), TypeInformation.of(TaskInfo.class), serializerConfig));
        KryoSerializer serializer = ((org.apache.flink.runtime.state.v2.AggregatingStateDescriptor) atomicReference.get()).getSerializer();
        Assertions.assertThat(serializer).isInstanceOf(KryoSerializer.class);
        Assertions.assertThat(serializer.getKryo().getRegistration(Path.class).getId()).isPositive();
    }

    private StreamingRuntimeContext createMapOperatorRuntimeContext() throws Exception {
        return createRuntimeContext(createMapPlainMockOp());
    }

    private StreamingRuntimeContext createRuntimeContext() throws Exception {
        return new StreamingRuntimeContext(createListPlainMockOp(), MockEnvironment.builder().build(), Collections.emptyMap());
    }

    private StreamingRuntimeContext createRuntimeContext(AtomicReference<Object> atomicReference, ExecutionConfig executionConfig) throws Exception {
        return createDescriptorCapturingMockOp(atomicReference, executionConfig, MockEnvironment.builder().setExecutionConfig(executionConfig).build()).getRuntimeContext();
    }

    private StreamingRuntimeContext createRuntimeContext(AbstractStreamOperator<?> abstractStreamOperator) {
        return new StreamingRuntimeContext(MockEnvironment.builder().build(), Collections.emptyMap(), abstractStreamOperator.getMetricGroup(), abstractStreamOperator.getOperatorID(), abstractStreamOperator.getProcessingTimeService(), abstractStreamOperator.getKeyedStateStore(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES);
    }

    private static AbstractStreamOperator<?> createDescriptorCapturingMockOp(AtomicReference<Object> atomicReference, final ExecutionConfig executionConfig, Environment environment) throws Exception {
        AbstractStreamOperator<?> abstractStreamOperator = new AbstractStreamOperator<Object>() { // from class: org.apache.flink.streaming.api.operators.StreamingRuntimeContextTest.1
            public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Object>> output) {
                super.setup(streamTask, streamConfig, output);
            }
        };
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        streamConfig.setOperatorID(new OperatorID());
        abstractStreamOperator.setup(new MockStreamTaskBuilder(environment).setExecutionConfig(executionConfig).build(), streamConfig, new CollectorOutput(new ArrayList()));
        StreamTaskStateInitializerImpl streamTaskStateInitializerImpl = new StreamTaskStateInitializerImpl(environment, new MemoryStateBackend());
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend) Mockito.mock(KeyedStateBackend.class);
        AsyncKeyedStateBackend asyncKeyedStateBackend = (AsyncKeyedStateBackend) Mockito.mock(AsyncKeyedStateBackend.class);
        DefaultKeyedStateStore defaultKeyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, new SerializerFactory() { // from class: org.apache.flink.streaming.api.operators.StreamingRuntimeContextTest.2
            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(executionConfig.getSerializerConfig());
            }
        });
        ((KeyedStateBackend) Mockito.doAnswer(invocationOnMock -> {
            atomicReference.set(invocationOnMock.getArguments()[2]);
            return null;
        }).when(keyedStateBackend)).getPartitionedState(Matchers.any(), (TypeSerializer) Mockito.any(TypeSerializer.class), (StateDescriptor) Mockito.any(StateDescriptor.class));
        ((AsyncKeyedStateBackend) Mockito.doAnswer(invocationOnMock2 -> {
            atomicReference.set(invocationOnMock2.getArguments()[0]);
            return null;
        }).when(asyncKeyedStateBackend)).createState((org.apache.flink.runtime.state.v2.StateDescriptor) Mockito.any(org.apache.flink.runtime.state.v2.StateDescriptor.class));
        abstractStreamOperator.initializeState(streamTaskStateInitializerImpl);
        abstractStreamOperator.getRuntimeContext().setKeyedStateStore(defaultKeyedStateStore);
        abstractStreamOperator.getRuntimeContext().setKeyedStateStoreV2(new DefaultKeyedStateStoreV2(asyncKeyedStateBackend));
        return abstractStreamOperator;
    }

    private static AbstractStreamOperator<?> createListPlainMockOp() throws Exception {
        AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator) Mockito.mock(AbstractStreamOperator.class);
        final ExecutionConfig executionConfig = new ExecutionConfig();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend) Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore defaultKeyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, new SerializerFactory() { // from class: org.apache.flink.streaming.api.operators.StreamingRuntimeContextTest.3
            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(executionConfig.getSerializerConfig());
            }
        });
        Mockito.when(abstractStreamOperator.getExecutionConfig()).thenReturn(executionConfig);
        ((KeyedStateBackend) Mockito.doAnswer(new Answer<ListState<String>>() { // from class: org.apache.flink.streaming.api.operators.StreamingRuntimeContextTest.4
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public ListState<String> m52answer(InvocationOnMock invocationOnMock) throws Throwable {
                ListStateDescriptor listStateDescriptor = (ListStateDescriptor) invocationOnMock.getArguments()[2];
                AbstractKeyedStateBackend createKeyedStateBackend = new MemoryStateBackend().createKeyedStateBackend(new KeyedStateBackendParametersImpl(new DummyEnvironment("test_task", 1, 0), new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry()));
                createKeyedStateBackend.setCurrentKey(0);
                return createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, listStateDescriptor);
            }
        }).when(keyedStateBackend)).getPartitionedState(Matchers.any(), (TypeSerializer) Mockito.any(TypeSerializer.class), (StateDescriptor) Mockito.any(ListStateDescriptor.class));
        Mockito.when(abstractStreamOperator.getKeyedStateStore()).thenReturn(defaultKeyedStateStore);
        Mockito.when(abstractStreamOperator.getOperatorID()).thenReturn(new OperatorID());
        return abstractStreamOperator;
    }

    private static AbstractStreamOperator<?> createMapPlainMockOp() throws Exception {
        AbstractStreamOperator<?> abstractStreamOperator = (AbstractStreamOperator) Mockito.mock(AbstractStreamOperator.class);
        final ExecutionConfig executionConfig = new ExecutionConfig();
        KeyedStateBackend keyedStateBackend = (KeyedStateBackend) Mockito.mock(KeyedStateBackend.class);
        DefaultKeyedStateStore defaultKeyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, new SerializerFactory() { // from class: org.apache.flink.streaming.api.operators.StreamingRuntimeContextTest.5
            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(executionConfig.getSerializerConfig());
            }
        });
        Mockito.when(abstractStreamOperator.getExecutionConfig()).thenReturn(executionConfig);
        ((KeyedStateBackend) Mockito.doAnswer(new Answer<MapState<Integer, String>>() { // from class: org.apache.flink.streaming.api.operators.StreamingRuntimeContextTest.6
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public MapState<Integer, String> m53answer(InvocationOnMock invocationOnMock) throws Throwable {
                MapStateDescriptor mapStateDescriptor = (MapStateDescriptor) invocationOnMock.getArguments()[2];
                AbstractKeyedStateBackend createKeyedStateBackend = new MemoryStateBackend().createKeyedStateBackend(new KeyedStateBackendParametersImpl(new DummyEnvironment("test_task", 1, 0), new JobID(), "test_op", IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), TtlTimeProvider.DEFAULT, new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry()));
                createKeyedStateBackend.setCurrentKey(0);
                return createKeyedStateBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mapStateDescriptor);
            }
        }).when(keyedStateBackend)).getPartitionedState(Matchers.any(), (TypeSerializer) Mockito.any(TypeSerializer.class), (StateDescriptor) Mockito.any(MapStateDescriptor.class));
        Mockito.when(abstractStreamOperator.getKeyedStateStore()).thenReturn(defaultKeyedStateStore);
        Mockito.when(abstractStreamOperator.getOperatorID()).thenReturn(new OperatorID());
        Mockito.when(abstractStreamOperator.getProcessingTimeService()).thenReturn(new TestProcessingTimeService());
        return abstractStreamOperator;
    }
}
