package org.apache.flink.streaming.api.operators;

import java.util.Optional;
import java.util.concurrent.FutureTask;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulatorTest;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest.class */
class StreamOperatorStateHandlerTest {

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest$CancelableFuture.class */
    private static class CancelableFuture<T> extends FutureTask<T> {
        public CancelableFuture() {
            super(() -> {
                throw new UnsupportedOperationException();
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest$TestStateSnapshotContextSynchronousImpl.class */
    private static class TestStateSnapshotContextSynchronousImpl extends StateSnapshotContextSynchronousImpl {
        public TestStateSnapshotContextSynchronousImpl(long j, long j2, CloseableRegistry closeableRegistry) {
            super(j, j2, new MemCheckpointStreamFactory(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), new KeyGroupRange(0, 2), closeableRegistry);
            this.keyedStateCheckpointClosingFuture = new CancelableFuture();
            this.operatorStateCheckpointClosingFuture = new CancelableFuture();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/StreamOperatorStateHandlerTest$UnUsedKeyContext.class */
    private static class UnUsedKeyContext implements KeyContext {
        private UnUsedKeyContext() {
        }

        public void setCurrentKey(Object obj) {
            throw new UnsupportedOperationException();
        }

        public Object getCurrentKey() {
            throw new UnsupportedOperationException();
        }
    }

    StreamOperatorStateHandlerTest() {
    }

    @Test
    void testFailingBackendSnapshotMethod() throws Exception {
        CloseableRegistry closeableRegistry = new CloseableRegistry();
        try {
            CancelableFuture cancelableFuture = new CancelableFuture();
            CancelableFuture cancelableFuture2 = new CancelableFuture();
            CancelableFuture cancelableFuture3 = new CancelableFuture();
            CancelableFuture cancelableFuture4 = new CancelableFuture();
            CancelableFuture cancelableFuture5 = new CancelableFuture();
            CancelableFuture cancelableFuture6 = new CancelableFuture();
            OperatorSnapshotFutures operatorSnapshotFutures = new OperatorSnapshotFutures(cancelableFuture, cancelableFuture2, cancelableFuture3, cancelableFuture4, cancelableFuture5, cancelableFuture6);
            TestStateSnapshotContextSynchronousImpl testStateSnapshotContextSynchronousImpl = new TestStateSnapshotContextSynchronousImpl(42L, 1L, closeableRegistry);
            testStateSnapshotContextSynchronousImpl.getRawKeyedOperatorStateOutput();
            testStateSnapshotContextSynchronousImpl.getRawOperatorStateOutput();
            StreamOperatorStateContext streamOperatorStateContext = new StreamTaskStateInitializerImpl(new MockEnvironmentBuilder().build(), new HashMapStateBackend()).streamOperatorStateContext(new OperatorID(), "whatever", new TestProcessingTimeService(), new UnUsedKeyContext(), IntSerializer.INSTANCE, closeableRegistry, new InterceptingOperatorMetricGroup(), 1.0d, false, false);
            StreamOperatorStateHandler streamOperatorStateHandler = new StreamOperatorStateHandler(streamOperatorStateContext, new ExecutionConfig(), closeableRegistry);
            StreamOperatorStateHandler.CheckpointedStreamOperator checkpointedStreamOperator = new StreamOperatorStateHandler.CheckpointedStreamOperator() { // from class: org.apache.flink.streaming.api.operators.StreamOperatorStateHandlerTest.1
                public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
                    stateInitializationContext.getKeyedStateStore().getState(new ValueStateDescriptor("keyedStateField", LongSerializer.INSTANCE)).update(42L);
                    stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("operatorStateField", LongSerializer.INSTANCE)).add(42L);
                }

                public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
                    throw new ExpectedTestException();
                }
            };
            streamOperatorStateHandler.setCurrentKey("44");
            streamOperatorStateHandler.initializeOperatorState(checkpointedStreamOperator);
            Assertions.assertThat(streamOperatorStateContext.operatorStateBackend().getRegisteredStateNames()).isNotEmpty();
            Assertions.assertThat(streamOperatorStateContext.keyedStateBackend().numKeyValueStatesByName()).isOne();
            Assertions.assertThatThrownBy(() -> {
                streamOperatorStateHandler.snapshotState(checkpointedStreamOperator, Optional.of(streamOperatorStateContext.internalTimerServiceManager()), "42", 42L, 42L, CheckpointOptions.forCheckpointWithDefaultLocation(), new MemCheckpointStreamFactory(HashBufferAccumulatorTest.NETWORK_BUFFER_SIZE), operatorSnapshotFutures, testStateSnapshotContextSynchronousImpl, false, false);
            }).isInstanceOfSatisfying(CheckpointException.class, checkpointException -> {
                Assertions.assertThat(ExceptionUtils.findThrowableWithMessage(checkpointException, ExpectedTestException.MESSAGE)).isPresent();
            });
            Assertions.assertThat(cancelableFuture).isCancelled();
            Assertions.assertThat(cancelableFuture2).isCancelled();
            Assertions.assertThat(testStateSnapshotContextSynchronousImpl.getKeyedStateStreamFuture()).isCancelled();
            Assertions.assertThat(cancelableFuture3).isCancelled();
            Assertions.assertThat(cancelableFuture4).isCancelled();
            Assertions.assertThat(testStateSnapshotContextSynchronousImpl.getOperatorStateStreamFuture()).isCancelled();
            Assertions.assertThat(cancelableFuture5).isCancelled();
            Assertions.assertThat(cancelableFuture6).isCancelled();
            streamOperatorStateHandler.dispose();
            Assertions.assertThat(streamOperatorStateContext.operatorStateBackend().getRegisteredBroadcastStateNames()).isEmpty();
            Assertions.assertThat(streamOperatorStateContext.operatorStateBackend().getRegisteredStateNames()).isEmpty();
            Assertions.assertThat(streamOperatorStateContext.keyedStateBackend().numKeyValueStatesByName()).isZero();
            closeableRegistry.close();
        } catch (Throwable th) {
            try {
                closeableRegistry.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
