package org.apache.flink.runtime.state;

import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.StateObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/SnapshotStrategyRunner.class */
public final class SnapshotStrategyRunner<T extends StateObject, SR extends SnapshotResources> {
    private static final Logger LOG = LoggerFactory.getLogger(SnapshotStrategyRunner.class);
    private static final String LOG_SYNC_COMPLETED_TEMPLATE = "{} ({}, synchronous part) in thread {} took {} ms.";
    private static final String LOG_ASYNC_COMPLETED_TEMPLATE = "{} ({}, asynchronous part) in thread {} took {} ms.";

    @Nonnull
    private final String description;

    @Nonnull
    private final SnapshotStrategy<T, SR> snapshotStrategy;

    @Nonnull
    private final CloseableRegistry cancelStreamRegistry;

    @Nonnull
    private final SnapshotExecutionType executionType;

    public SnapshotStrategyRunner(@Nonnull String str, @Nonnull SnapshotStrategy<T, SR> snapshotStrategy, @Nonnull CloseableRegistry closeableRegistry, @Nonnull SnapshotExecutionType snapshotExecutionType) {
        this.description = str;
        this.snapshotStrategy = snapshotStrategy;
        this.cancelStreamRegistry = closeableRegistry;
        this.executionType = snapshotExecutionType;
    }

    @Nonnull
    public final RunnableFuture<SnapshotResult<T>> snapshot(long j, long j2, @Nonnull final CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        final SR syncPrepareResources = this.snapshotStrategy.syncPrepareResources(j);
        logCompletedInternal(LOG_SYNC_COMPLETED_TEMPLATE, checkpointStreamFactory, currentTimeMillis);
        final SnapshotStrategy.SnapshotResultSupplier<T> asyncSnapshot = this.snapshotStrategy.asyncSnapshot(syncPrepareResources, j, j2, checkpointStreamFactory, checkpointOptions);
        AsyncSnapshotCallable<SnapshotResult<T>>.AsyncSnapshotTask asyncSnapshotFutureTask = new AsyncSnapshotCallable<SnapshotResult<T>>() { // from class: org.apache.flink.runtime.state.SnapshotStrategyRunner.1
            /* JADX INFO: Access modifiers changed from: protected */
            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            public SnapshotResult<T> callInternal() throws Exception {
                return asyncSnapshot.get(this.snapshotCloseableRegistry);
            }

            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            protected void cleanupProvidedResources() {
                if (syncPrepareResources != null) {
                    syncPrepareResources.release();
                }
            }

            @Override // org.apache.flink.runtime.state.AsyncSnapshotCallable
            protected void logAsyncSnapshotComplete(long j3) {
                SnapshotStrategyRunner.this.logCompletedInternal(SnapshotStrategyRunner.LOG_ASYNC_COMPLETED_TEMPLATE, checkpointStreamFactory, j3);
            }
        }.toAsyncSnapshotFutureTask(this.cancelStreamRegistry);
        if (this.executionType == SnapshotExecutionType.SYNCHRONOUS) {
            asyncSnapshotFutureTask.run();
        }
        return asyncSnapshotFutureTask;
    }

    private void logCompletedInternal(@Nonnull String str, @Nonnull Object obj, long j) {
        LOG.debug(str, new Object[]{this.description, obj, Thread.currentThread(), Long.valueOf(System.currentTimeMillis() - j)});
    }

    public String toString() {
        return "SnapshotStrategy {" + this.description + "}";
    }
}
