package org.apache.flink.state.changelog;

import java.util.Collection;
import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/state/changelog/ChangelogStateBackend.class */
public class ChangelogStateBackend extends AbstractChangelogStateBackend implements ConfigurableStateBackend {
    private static final long serialVersionUID = 1000;

    ChangelogStateBackend(StateBackend stateBackend) {
        super(stateBackend);
    }

    public StateBackend configure(ReadableConfig readableConfig, ClassLoader classLoader) throws IllegalConfigurationException {
        return this.delegatedStateBackend instanceof ConfigurableStateBackend ? new ChangelogStateBackend(this.delegatedStateBackend.configure(readableConfig, classLoader)) : this;
    }

    @Override // org.apache.flink.state.changelog.AbstractChangelogStateBackend
    protected <K> CheckpointableKeyedStateBackend<K> restore(Environment environment, String str, KeyGroupRange keyGroupRange, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<ChangelogStateBackendHandle> collection, ChangelogBackendRestoreOperation.BaseBackendBuilder<K> baseBackendBuilder) throws Exception {
        StateChangelogStorage stateChangelogStorage = (StateChangelogStorage) Preconditions.checkNotNull(environment.getTaskStateManager().getStateChangelogStorage(), "Changelog storage is null when creating and restoring the ChangelogKeyedStateBackend.");
        String taskNameWithSubtasks = environment.getTaskInfo().getTaskNameWithSubtasks();
        ExecutionConfig executionConfig = environment.getExecutionConfig();
        ChangelogStateFactory changelogStateFactory = new ChangelogStateFactory();
        CheckpointableKeyedStateBackend<K> restore = ChangelogBackendRestoreOperation.restore(environment.getUserCodeClassLoader().asClassLoader(), collection, baseBackendBuilder, (abstractKeyedStateBackend, collection2) -> {
            return new ChangelogKeyedStateBackend(abstractKeyedStateBackend, taskNameWithSubtasks, executionConfig, ttlTimeProvider, new ChangelogStateBackendMetricGroup(metricGroup), stateChangelogStorage.createWriter(str, keyGroupRange, environment.getMainMailboxExecutor()), collection2, environment.getCheckpointStorageAccess(), changelogStateFactory).getChangelogRestoreTarget();
        });
        ChangelogKeyedStateBackend changelogKeyedStateBackend = (ChangelogKeyedStateBackend) restore;
        PeriodicMaterializationManager periodicMaterializationManager = new PeriodicMaterializationManager((MailboxExecutor) Preconditions.checkNotNull(environment.getMainMailboxExecutor()), (ExecutorService) Preconditions.checkNotNull(environment.getAsyncOperationsThreadPool()), taskNameWithSubtasks, (str2, th) -> {
            environment.failExternally(new AsynchronousException(str2, th));
        }, changelogKeyedStateBackend, new ChangelogMaterializationMetricGroup(metricGroup), executionConfig.getPeriodicMaterializeIntervalMillis(), executionConfig.getMaterializationMaxAllowedFailures(), str);
        changelogKeyedStateBackend.registerCloseable(periodicMaterializationManager);
        periodicMaterializationManager.start();
        return restore;
    }
}
