package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskStateManagerImpl.class */
public class TaskStateManagerImpl implements TaskStateManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskStateManagerImpl.class);
    private final JobID jobId;
    private final ExecutionAttemptID executionAttemptID;

    @Nullable
    private final JobManagerTaskRestore jobManagerTaskRestore;
    private final TaskLocalStateStore localStateStore;

    @Nullable
    private final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager;

    @Nullable
    private final StateChangelogStorage<?> stateChangelogStorage;
    private final TaskExecutorStateChangelogStoragesManager changelogStoragesManager;
    private final CheckpointResponder checkpointResponder;
    private final SequentialChannelStateReader sequentialChannelStateReader;

    public TaskStateManagerImpl(@Nonnull JobID jobID, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore taskLocalStateStore, @Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManagerClosableWrapper, @Nullable StateChangelogStorage<?> stateChangelogStorage, @Nonnull TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder) {
        this(jobID, executionAttemptID, taskLocalStateStore, fileMergingSnapshotManagerClosableWrapper, stateChangelogStorage, taskExecutorStateChangelogStoragesManager, jobManagerTaskRestore, checkpointResponder, new SequentialChannelStateReaderImpl(jobManagerTaskRestore == null ? new TaskStateSnapshot() : jobManagerTaskRestore.getTaskStateSnapshot()));
    }

    public TaskStateManagerImpl(@Nonnull JobID jobID, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore taskLocalStateStore, @Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManagerClosableWrapper, @Nullable StateChangelogStorage<?> stateChangelogStorage, @Nonnull TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder, @Nonnull SequentialChannelStateReaderImpl sequentialChannelStateReaderImpl) {
        this.jobId = jobID;
        this.localStateStore = taskLocalStateStore;
        this.fileMergingSnapshotManager = fileMergingSnapshotManagerClosableWrapper;
        this.stateChangelogStorage = stateChangelogStorage;
        this.changelogStoragesManager = taskExecutorStateChangelogStoragesManager;
        this.jobManagerTaskRestore = jobManagerTaskRestore;
        this.executionAttemptID = executionAttemptID;
        this.checkpointResponder = checkpointResponder;
        this.sequentialChannelStateReader = sequentialChannelStateReaderImpl;
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public void reportInitializationMetrics(SubTaskInitializationMetrics subTaskInitializationMetrics) {
        this.checkpointResponder.reportInitializationMetrics(this.jobId, subTaskInitializationMetrics);
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, @Nullable TaskStateSnapshot taskStateSnapshot, @Nullable TaskStateSnapshot taskStateSnapshot2) {
        long checkpointId = checkpointMetaData.getCheckpointId();
        this.localStateStore.storeLocalState(checkpointId, taskStateSnapshot2);
        this.checkpointResponder.acknowledgeCheckpoint(this.jobId, this.executionAttemptID, checkpointId, checkpointMetrics, taskStateSnapshot);
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public void reportIncompleteTaskStateSnapshots(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
        this.checkpointResponder.reportCheckpointMetrics(this.jobId, this.executionAttemptID, checkpointMetaData.getCheckpointId(), checkpointMetrics);
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public InflightDataRescalingDescriptor getInputRescalingDescriptor() {
        return this.jobManagerTaskRestore == null ? InflightDataRescalingDescriptor.NO_RESCALE : this.jobManagerTaskRestore.getTaskStateSnapshot().getInputRescalingDescriptor();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public InflightDataRescalingDescriptor getOutputRescalingDescriptor() {
        return this.jobManagerTaskRestore == null ? InflightDataRescalingDescriptor.NO_RESCALE : this.jobManagerTaskRestore.getTaskStateSnapshot().getOutputRescalingDescriptor();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public boolean isTaskDeployedAsFinished() {
        if (this.jobManagerTaskRestore == null) {
            return false;
        }
        return this.jobManagerTaskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public Optional<Long> getRestoreCheckpointId() {
        return this.jobManagerTaskRestore == null ? Optional.empty() : Optional.of(Long.valueOf(this.jobManagerTaskRestore.getRestoreCheckpointId()));
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
        OperatorSubtaskState subtaskStateByOperatorID;
        if (this.jobManagerTaskRestore == null) {
            return PrioritizedOperatorSubtaskState.emptyNotRestored();
        }
        OperatorSubtaskState subtaskStateByOperatorID2 = this.jobManagerTaskRestore.getTaskStateSnapshot().getSubtaskStateByOperatorID(operatorID);
        if (subtaskStateByOperatorID2 == null) {
            return PrioritizedOperatorSubtaskState.empty(this.jobManagerTaskRestore.getRestoreCheckpointId());
        }
        long restoreCheckpointId = this.jobManagerTaskRestore.getRestoreCheckpointId();
        TaskStateSnapshot retrieveLocalState = this.localStateStore.retrieveLocalState(restoreCheckpointId);
        this.localStateStore.pruneMatchingCheckpoints(j -> {
            return j != restoreCheckpointId;
        });
        List emptyList = Collections.emptyList();
        if (retrieveLocalState != null && (subtaskStateByOperatorID = retrieveLocalState.getSubtaskStateByOperatorID(operatorID)) != null) {
            emptyList = Collections.singletonList(subtaskStateByOperatorID);
        }
        LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local state store {}.", new Object[]{operatorID, subtaskStateByOperatorID2, emptyList, this.localStateStore});
        return new PrioritizedOperatorSubtaskState.Builder(subtaskStateByOperatorID2, emptyList, Long.valueOf(this.jobManagerTaskRestore.getRestoreCheckpointId())).build();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public Optional<OperatorSubtaskState> getSubtaskJobManagerRestoredState(OperatorID operatorID) {
        OperatorSubtaskState subtaskStateByOperatorID;
        if (this.jobManagerTaskRestore != null && (subtaskStateByOperatorID = this.jobManagerTaskRestore.getTaskStateSnapshot().getSubtaskStateByOperatorID(operatorID)) != null) {
            return Optional.of(subtaskStateByOperatorID);
        }
        return Optional.empty();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    @Nonnull
    public LocalRecoveryConfig createLocalRecoveryConfig() {
        return this.localStateStore.getLocalRecoveryConfig();
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    public SequentialChannelStateReader getSequentialChannelStateReader() {
        return this.sequentialChannelStateReader;
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    @Nullable
    public StateChangelogStorage<?> getStateChangelogStorage() {
        return this.stateChangelogStorage;
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    @Nullable
    public StateChangelogStorageView<?> getStateChangelogStorageView(Configuration configuration, ChangelogStateHandle changelogStateHandle) {
        StateChangelogStorageView<?> stateChangelogStorageView = null;
        try {
            stateChangelogStorageView = this.changelogStoragesManager.stateChangelogStorageViewForJob(this.jobId, configuration, changelogStateHandle);
        } catch (IOException e) {
            ExceptionUtils.rethrow(e);
        }
        return stateChangelogStorageView;
    }

    @Override // org.apache.flink.runtime.state.TaskStateManager
    @Nullable
    public FileMergingSnapshotManager getFileMergingSnapshotManager() {
        if (this.fileMergingSnapshotManager == null) {
            return null;
        }
        return this.fileMergingSnapshotManager.get();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.localStateStore.confirmCheckpoint(j);
    }

    public void notifyCheckpointAborted(long j) {
        this.localStateStore.abortCheckpoint(j);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.sequentialChannelStateReader.close();
        if (this.fileMergingSnapshotManager != null) {
            this.fileMergingSnapshotManager.close();
        }
    }
}
