package org.apache.flink.runtime.state.ttl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/state/ttl/StateBackendTestContext.class */
public abstract class StateBackendTestContext {
    public static final int NUMBER_OF_KEY_GROUPS = 10;
    private final TtlTimeProvider timeProvider;
    private MockEnvironment env;
    private CheckpointableKeyedStateBackend<String> keyedStateBackend;
    private final StateBackend stateBackend = (StateBackend) Preconditions.checkNotNull(createStateBackend());
    private final CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
    private final CheckpointStreamFactory checkpointStreamFactory = createCheckpointStreamFactory();
    private final SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
    private final List<KeyedStateHandle> snapshots = new ArrayList();

    /* JADX INFO: Access modifiers changed from: protected */
    public StateBackendTestContext(TtlTimeProvider ttlTimeProvider) {
        this.timeProvider = (TtlTimeProvider) Preconditions.checkNotNull(ttlTimeProvider);
    }

    protected abstract StateBackend createStateBackend();

    protected CheckpointStorage createCheckpointStorage() {
        if (this.stateBackend instanceof CheckpointStorage) {
            return this.stateBackend;
        }
        throw new IllegalStateException("The state backend under test does not implement CheckpointStorage.Please override 'createCheckpointStorage' and provide an appropriatecheckpoint storage instance");
    }

    private CheckpointStreamFactory createCheckpointStreamFactory() {
        try {
            return createCheckpointStorage().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(2L, this.checkpointOptions.getTargetLocation());
        } catch (IOException e) {
            throw new RuntimeException("unexpected");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createAndRestoreKeyedStateBackend(KeyedStateHandle keyedStateHandle) {
        createAndRestoreKeyedStateBackend(10, keyedStateHandle);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createAndRestoreKeyedStateBackend(int i, KeyedStateHandle keyedStateHandle) {
        Collection arrayList;
        if (keyedStateHandle == null) {
            arrayList = Collections.emptyList();
        } else {
            arrayList = new ArrayList(1);
            arrayList.add(keyedStateHandle);
        }
        this.env = MockEnvironment.builder().build();
        try {
            disposeKeyedStateBackend();
            this.keyedStateBackend = this.stateBackend.createKeyedStateBackend(this.env, new JobID(), "test", StringSerializer.INSTANCE, i, new KeyGroupRange(0, i - 1), this.env.getTaskKvStateRegistry(), this.timeProvider, new UnregisteredMetricsGroup(), arrayList, new CloseableRegistry());
        } catch (Exception e) {
            throw new RuntimeException("unexpected", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose() throws Exception {
        disposeKeyedStateBackend();
        Iterator<KeyedStateHandle> it = this.snapshots.iterator();
        while (it.hasNext()) {
            it.next().discardState();
        }
        this.snapshots.clear();
        this.sharedStateRegistry.close();
        if (this.env != null) {
            this.env.close();
        }
    }

    private void disposeKeyedStateBackend() {
        if (this.keyedStateBackend != null) {
            this.keyedStateBackend.dispose();
            this.keyedStateBackend = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KeyedStateHandle takeSnapshot() throws Exception {
        KeyedStateHandle jobManagerOwnedSnapshot = triggerSnapshot().get().getJobManagerOwnedSnapshot();
        if (jobManagerOwnedSnapshot != null) {
            jobManagerOwnedSnapshot.registerSharedStates(this.sharedStateRegistry);
        }
        return jobManagerOwnedSnapshot;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> triggerSnapshot() throws Exception {
        RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = this.keyedStateBackend.snapshot(682375462392L, 10L, this.checkpointStreamFactory, this.checkpointOptions);
        if (!snapshot.isDone()) {
            snapshot.run();
        }
        return snapshot;
    }

    public void setCurrentKey(String str) {
        Preconditions.checkNotNull(this.keyedStateBackend, "keyed backend is not initialised");
        this.keyedStateBackend.setCurrentKey(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <N, S extends State, V> S createState(StateDescriptor<S, V> stateDescriptor, N n) throws Exception {
        InternalKvState orCreateKeyedState = this.keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, stateDescriptor);
        orCreateKeyedState.setCurrentNamespace(n);
        return orCreateKeyedState;
    }

    public <B extends CheckpointableKeyedStateBackend<String>> B getKeyedStateBackend() {
        return (B) this.keyedStateBackend;
    }
}
