package org.apache.flink.runtime.state;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest.class */
class StateSnapshotTransformerTest {
    private final CheckpointableKeyedStateBackend<Integer> backend;
    private final BlockerCheckpointStreamFactory streamFactory;
    private final StateSnapshotTransformer.StateSnapshotTransformFactory<?> snapshotTransformFactory = SingleThreadAccessCheckingSnapshotTransformFactory.create();

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest$SingleThreadAccessChecker.class */
    private static class SingleThreadAccessChecker implements Serializable {
        private static final long serialVersionUID = 131020282727167064L;
        private final AtomicReference<Thread> currentThreadRef = new AtomicReference<>();
        static final /* synthetic */ boolean $assertionsDisabled;

        private SingleThreadAccessChecker() {
        }

        void checkSingleThreadAccess() {
            this.currentThreadRef.compareAndSet(null, Thread.currentThread());
            if (!$assertionsDisabled && !Thread.currentThread().equals(this.currentThreadRef.get())) {
                throw new AssertionError("Concurrent access from another thread");
            }
        }

        static {
            $assertionsDisabled = !StateSnapshotTransformerTest.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest$SingleThreadAccessCheckingSnapshotTransformFactory.class */
    private static class SingleThreadAccessCheckingSnapshotTransformFactory<T> implements StateSnapshotTransformer.StateSnapshotTransformFactory<T> {
        private final SingleThreadAccessChecker singleThreadAccessChecker = new SingleThreadAccessChecker();

        private SingleThreadAccessCheckingSnapshotTransformFactory() {
        }

        static <T> StateSnapshotTransformer.StateSnapshotTransformFactory<T> create() {
            return new SingleThreadAccessCheckingSnapshotTransformFactory();
        }

        public Optional<StateSnapshotTransformer<T>> createForDeserializedState() {
            this.singleThreadAccessChecker.checkSingleThreadAccess();
            return (Optional<StateSnapshotTransformer<T>>) createStateSnapshotTransformer();
        }

        public Optional<StateSnapshotTransformer<byte[]>> createForSerializedState() {
            this.singleThreadAccessChecker.checkSingleThreadAccess();
            return createStateSnapshotTransformer();
        }

        private <T1> Optional<StateSnapshotTransformer<T1>> createStateSnapshotTransformer() {
            return Optional.of(new StateSnapshotTransformer<T1>() { // from class: org.apache.flink.runtime.state.StateSnapshotTransformerTest.SingleThreadAccessCheckingSnapshotTransformFactory.1
                private final SingleThreadAccessChecker singleThreadAccessChecker = new SingleThreadAccessChecker();

                @Nullable
                public T1 filterOrTransform(@Nullable T1 t1) {
                    this.singleThreadAccessChecker.checkSingleThreadAccess();
                    return t1;
                }
            });
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest$TestListState.class */
    private class TestListState extends TestState {
        private final InternalListState<Integer, VoidNamespace, String> state;

        private TestListState() throws Exception {
            super();
            this.state = StateSnapshotTransformerTest.this.backend.createOrUpdateInternalState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor("TestListState", new SingleThreadAccessCheckingTypeSerializer(StringSerializer.INSTANCE)), StateSnapshotTransformerTest.this.snapshotTransformFactory);
            this.state.setCurrentNamespace(VoidNamespace.INSTANCE);
        }

        @Override // org.apache.flink.runtime.state.StateSnapshotTransformerTest.TestState
        void setToRandomValue() throws Exception {
            int nextInt = this.rnd.nextInt(10);
            for (int i = 0; i < nextInt; i++) {
                this.state.add(getRandomString());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest$TestMapState.class */
    private class TestMapState extends TestState {
        private final InternalMapState<Integer, VoidNamespace, String, String> state;

        private TestMapState() throws Exception {
            super();
            this.state = StateSnapshotTransformerTest.this.backend.createOrUpdateInternalState(VoidNamespaceSerializer.INSTANCE, new MapStateDescriptor("TestMapState", StringSerializer.INSTANCE, StringSerializer.INSTANCE), StateSnapshotTransformerTest.this.snapshotTransformFactory);
            this.state.setCurrentNamespace(VoidNamespace.INSTANCE);
        }

        @Override // org.apache.flink.runtime.state.StateSnapshotTransformerTest.TestState
        void setToRandomValue() throws Exception {
            int nextInt = this.rnd.nextInt(10);
            for (int i = 0; i < nextInt; i++) {
                this.state.put(getRandomString(), getRandomString());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest$TestState.class */
    private abstract class TestState {
        final Random rnd = new Random();

        private TestState() {
        }

        abstract void setToRandomValue() throws Exception;

        String getRandomString() {
            return StringUtils.getRandomString(this.rnd, 5, 10);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/StateSnapshotTransformerTest$TestValueState.class */
    private class TestValueState extends TestState {
        private final InternalValueState<Integer, VoidNamespace, String> state;

        private TestValueState() throws Exception {
            super();
            this.state = StateSnapshotTransformerTest.this.backend.createOrUpdateInternalState(VoidNamespaceSerializer.INSTANCE, new ValueStateDescriptor("TestValueState", StringSerializer.INSTANCE), StateSnapshotTransformerTest.this.snapshotTransformFactory);
            this.state.setCurrentNamespace(VoidNamespace.INSTANCE);
        }

        @Override // org.apache.flink.runtime.state.StateSnapshotTransformerTest.TestState
        void setToRandomValue() throws Exception {
            this.state.update(getRandomString());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StateSnapshotTransformerTest(CheckpointableKeyedStateBackend<Integer> checkpointableKeyedStateBackend, BlockerCheckpointStreamFactory blockerCheckpointStreamFactory) {
        this.backend = checkpointableKeyedStateBackend;
        this.streamFactory = blockerCheckpointStreamFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void testNonConcurrentSnapshotTransformerAccess() throws Exception {
        for (TestState testState : Arrays.asList(new TestValueState(), new TestListState(), new TestMapState())) {
            for (int i = 0; i < 100; i++) {
                this.backend.setCurrentKey(Integer.valueOf(i));
                testState.setToRandomValue();
            }
            CheckpointOptions forCheckpointWithDefaultLocation = CheckpointOptions.forCheckpointWithDefaultLocation();
            RunnableFuture snapshot = this.backend.snapshot(1L, 0L, this.streamFactory, forCheckpointWithDefaultLocation);
            RunnableFuture snapshot2 = this.backend.snapshot(2L, 0L, this.streamFactory, forCheckpointWithDefaultLocation);
            Thread thread = new Thread(snapshot, "snapshot1");
            thread.start();
            Thread thread2 = new Thread(snapshot2, "snapshot2");
            thread2.start();
            thread.join();
            thread2.join();
            snapshot.get();
            snapshot2.get();
        }
    }
}
