package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.class */
public final class AsyncCheckpointRunnable implements Runnable, Closeable {
    public static final Logger LOG = LoggerFactory.getLogger(AsyncCheckpointRunnable.class);
    private final String taskName;
    private final Consumer<AsyncCheckpointRunnable> unregisterConsumer;
    private final boolean isTaskDeployedAsFinished;
    private final boolean isTaskFinished;
    private final Supplier<Boolean> isTaskRunning;
    private final Environment taskEnvironment;
    private final AsyncExceptionHandler asyncExceptionHandler;
    private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
    private final CheckpointMetaData checkpointMetaData;
    private final CheckpointMetricsBuilder checkpointMetrics;
    private final long asyncConstructionNanos;
    private final CompletableFuture<Void> finishedFuture = new CompletableFuture<>();
    private final AtomicReference<AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable$AsyncCheckpointState.class */
    public enum AsyncCheckpointState {
        RUNNING,
        DISCARDED,
        COMPLETED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable$SnapshotsFinalizeResult.class */
    public static class SnapshotsFinalizeResult {
        final TaskStateSnapshot jobManagerTaskOperatorSubtaskStates;
        final TaskStateSnapshot localTaskOperatorSubtaskStates;
        final long bytesPersistedDuringAlignment;

        public SnapshotsFinalizeResult(TaskStateSnapshot taskStateSnapshot, TaskStateSnapshot taskStateSnapshot2, long j) {
            this.jobManagerTaskOperatorSubtaskStates = taskStateSnapshot;
            this.localTaskOperatorSubtaskStates = taskStateSnapshot2;
            this.bytesPersistedDuringAlignment = j;
        }
    }

    public boolean isRunning() {
        return this.asyncCheckpointState.get() == AsyncCheckpointState.RUNNING;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AsyncCheckpointRunnable(Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointMetricsBuilder checkpointMetricsBuilder, long j, String str, Consumer<AsyncCheckpointRunnable> consumer, Environment environment, AsyncExceptionHandler asyncExceptionHandler, boolean z, boolean z2, Supplier<Boolean> supplier) {
        this.operatorSnapshotsInProgress = (Map) Preconditions.checkNotNull(map);
        this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
        this.checkpointMetrics = (CheckpointMetricsBuilder) Preconditions.checkNotNull(checkpointMetricsBuilder);
        this.asyncConstructionNanos = j;
        this.taskName = (String) Preconditions.checkNotNull(str);
        this.unregisterConsumer = consumer;
        this.taskEnvironment = (Environment) Preconditions.checkNotNull(environment);
        this.asyncExceptionHandler = (AsyncExceptionHandler) Preconditions.checkNotNull(asyncExceptionHandler);
        this.isTaskDeployedAsFinished = z;
        this.isTaskFinished = z2;
        this.isTaskRunning = supplier;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("{} - started executing asynchronous part of checkpoint {}. Asynchronous start delay: {} ms", new Object[]{this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf((System.nanoTime() - this.asyncConstructionNanos) / 1000000)});
        FileSystemSafetyNet.initializeSafetyNetForThread();
        try {
            try {
                SnapshotsFinalizeResult finalizedFinishedSnapshots = this.isTaskDeployedAsFinished ? finalizedFinishedSnapshots() : finalizeNonFinishedSnapshots();
                long nanoTime = (System.nanoTime() - this.asyncConstructionNanos) / 1000000;
                this.checkpointMetrics.setBytesPersistedDuringAlignment(finalizedFinishedSnapshots.bytesPersistedDuringAlignment);
                this.checkpointMetrics.setAsyncDurationMillis(nanoTime);
                if (this.asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) {
                    reportCompletedSnapshotStates(finalizedFinishedSnapshots.jobManagerTaskOperatorSubtaskStates, finalizedFinishedSnapshots.localTaskOperatorSubtaskStates, nanoTime);
                } else {
                    LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                }
                this.finishedFuture.complete(null);
                this.unregisterConsumer.accept(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            } catch (Exception e) {
                LOG.info("{} - asynchronous part of checkpoint {} could not be completed.", new Object[]{this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()), e});
                handleExecutionException(e);
                this.finishedFuture.completeExceptionally(e);
                this.unregisterConsumer.accept(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            }
        } catch (Throwable th) {
            this.unregisterConsumer.accept(this);
            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
            throw th;
        }
    }

    private SnapshotsFinalizeResult finalizedFinishedSnapshots() throws Exception {
        Iterator<Map.Entry<OperatorID, OperatorSnapshotFutures>> it = this.operatorSnapshotsInProgress.entrySet().iterator();
        while (it.hasNext()) {
            OperatorSnapshotFutures value = it.next().getValue();
            value.getInputChannelStateFuture().get();
            value.getResultSubpartitionStateFuture().get();
        }
        return new SnapshotsFinalizeResult(TaskStateSnapshot.FINISHED_ON_RESTORE, TaskStateSnapshot.FINISHED_ON_RESTORE, 0L);
    }

    private SnapshotsFinalizeResult finalizeNonFinishedSnapshots() throws Exception {
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size(), this.isTaskFinished);
        TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size(), this.isTaskFinished);
        long j = 0;
        for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : this.operatorSnapshotsInProgress.entrySet()) {
            OperatorID key = entry.getKey();
            OperatorSnapshotFinalizer operatorSnapshotFinalizer = new OperatorSnapshotFinalizer(entry.getValue());
            taskStateSnapshot.putSubtaskStateByOperatorID(key, operatorSnapshotFinalizer.getJobManagerOwnedState());
            taskStateSnapshot2.putSubtaskStateByOperatorID(key, operatorSnapshotFinalizer.getTaskLocalState());
            j = j + operatorSnapshotFinalizer.getJobManagerOwnedState().getResultSubpartitionState().getStateSize() + operatorSnapshotFinalizer.getJobManagerOwnedState().getInputChannelState().getStateSize();
        }
        return new SnapshotsFinalizeResult(taskStateSnapshot, taskStateSnapshot2, j);
    }

    private void reportCompletedSnapshotStates(TaskStateSnapshot taskStateSnapshot, TaskStateSnapshot taskStateSnapshot2, long j) {
        boolean hasState = taskStateSnapshot.hasState();
        boolean hasState2 = taskStateSnapshot2.hasState();
        Preconditions.checkState(hasState || !hasState2, "Found cached state but no corresponding primary state is reported to the job manager. This indicates a problem.");
        this.taskEnvironment.getTaskStateManager().reportTaskStateSnapshots(this.checkpointMetaData, this.checkpointMetrics.setBytesPersistedOfThisCheckpoint(taskStateSnapshot.getCheckpointedSize()).setTotalBytesPersisted(taskStateSnapshot.getStateSize()).build(), hasState ? taskStateSnapshot : null, hasState2 ? taskStateSnapshot2 : null);
        LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(j)});
        LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", new Object[]{this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()), taskStateSnapshot});
    }

    private void reportAbortedSnapshotStats(long j, long j2) {
        CheckpointMetrics buildIncomplete = this.checkpointMetrics.setTotalBytesPersisted(j).setBytesPersistedOfThisCheckpoint(j2).buildIncomplete();
        LOG.trace("{} - report failed checkpoint stats: {} {}", new Object[]{this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()), buildIncomplete});
        this.taskEnvironment.getTaskStateManager().reportIncompleteTaskStateSnapshots(this.checkpointMetaData, buildIncomplete);
    }

    private void handleExecutionException(Exception exc) {
        boolean z = false;
        AsyncCheckpointState asyncCheckpointState = this.asyncCheckpointState.get();
        while (true) {
            AsyncCheckpointState asyncCheckpointState2 = asyncCheckpointState;
            if (AsyncCheckpointState.DISCARDED == asyncCheckpointState2) {
                break;
            }
            if (this.asyncCheckpointState.compareAndSet(asyncCheckpointState2, AsyncCheckpointState.DISCARDED)) {
                z = true;
                try {
                    cleanup();
                } catch (Exception e) {
                    exc.addSuppressed(e);
                }
                long checkpointId = this.checkpointMetaData.getCheckpointId();
                String str = this.taskName;
                Exception exc2 = new Exception("Could not materialize checkpoint " + checkpointId + " for operator " + exc2 + ".", exc);
                if (this.isTaskRunning.get().booleanValue()) {
                    try {
                        this.taskEnvironment.declineCheckpoint(this.checkpointMetaData.getCheckpointId(), new CheckpointException((CheckpointFailureReason) ExceptionUtils.findThrowable(exc2, CheckpointException.class).map(checkpointException -> {
                            return checkpointException.getCheckpointFailureReason();
                        }).orElse(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION), exc2));
                    } catch (Exception e2) {
                        this.asyncExceptionHandler.handleAsyncException("Failure in asynchronous checkpoint materialization", new AsynchronousException(e2));
                    }
                } else {
                    LOG.info("Ignore decline of checkpoint {} as task is not running anymore.", Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                }
                asyncCheckpointState = AsyncCheckpointState.DISCARDED;
            } else {
                asyncCheckpointState = this.asyncCheckpointState.get();
            }
        }
        if (z) {
            return;
        }
        LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", exc);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (!this.asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.DISCARDED)) {
            logFailedCleanupAttempt();
            return;
        }
        try {
            Tuple2<Long, Long> cleanup = cleanup();
            reportAbortedSnapshotStats(((Long) cleanup.f0).longValue(), ((Long) cleanup.f1).longValue());
        } catch (Exception e) {
            LOG.warn("Could not properly clean up the async checkpoint runnable.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getCheckpointId() {
        return this.checkpointMetaData.getCheckpointId();
    }

    public CompletableFuture<Void> getFinishedFuture() {
        return this.finishedFuture;
    }

    private Tuple2<Long, Long> cleanup() throws Exception {
        LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.taskName);
        Exception exc = null;
        long j = 0;
        long j2 = 0;
        for (OperatorSnapshotFutures operatorSnapshotFutures : this.operatorSnapshotsInProgress.values()) {
            if (operatorSnapshotFutures != null) {
                try {
                    Tuple2<Long, Long> cancel = operatorSnapshotFutures.cancel();
                    j += ((Long) cancel.f0).longValue();
                    j2 += ((Long) cancel.f1).longValue();
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                }
            }
        }
        if (null != exc) {
            throw exc;
        }
        return Tuple2.of(Long.valueOf(j), Long.valueOf(j2));
    }

    private void logFailedCleanupAttempt() {
        LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", this.taskName, Long.valueOf(this.checkpointMetaData.getCheckpointId()));
    }
}
