/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReaderImpl;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.FileMergingSnapshotManagerClosableWrapper;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskStateManagerImpl
implements TaskStateManager {
    private static final Logger LOG = LoggerFactory.getLogger(TaskStateManagerImpl.class);
    private final JobID jobId;
    private final ExecutionAttemptID executionAttemptID;
    @Nullable
    private final JobManagerTaskRestore jobManagerTaskRestore;
    private final TaskLocalStateStore localStateStore;
    @Nullable
    private final FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager;
    @Nullable
    private final StateChangelogStorage<?> stateChangelogStorage;
    private final TaskExecutorStateChangelogStoragesManager changelogStoragesManager;
    private final CheckpointResponder checkpointResponder;
    private final SequentialChannelStateReader sequentialChannelStateReader;

    public TaskStateManagerImpl(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore localStateStore, @Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager, @Nullable StateChangelogStorage<?> stateChangelogStorage, @Nonnull TaskExecutorStateChangelogStoragesManager changelogStoragesManager, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder) {
        this(jobId, executionAttemptID, localStateStore, fileMergingSnapshotManager, stateChangelogStorage, changelogStoragesManager, jobManagerTaskRestore, checkpointResponder, new SequentialChannelStateReaderImpl(jobManagerTaskRestore == null ? new TaskStateSnapshot() : jobManagerTaskRestore.getTaskStateSnapshot()));
    }

    public TaskStateManagerImpl(@Nonnull JobID jobId, @Nonnull ExecutionAttemptID executionAttemptID, @Nonnull TaskLocalStateStore localStateStore, @Nullable FileMergingSnapshotManagerClosableWrapper fileMergingSnapshotManager, @Nullable StateChangelogStorage<?> stateChangelogStorage, @Nonnull TaskExecutorStateChangelogStoragesManager changelogStoragesManager, @Nullable JobManagerTaskRestore jobManagerTaskRestore, @Nonnull CheckpointResponder checkpointResponder, @Nonnull SequentialChannelStateReaderImpl sequentialChannelStateReader) {
        this.jobId = jobId;
        this.localStateStore = localStateStore;
        this.fileMergingSnapshotManager = fileMergingSnapshotManager;
        this.stateChangelogStorage = stateChangelogStorage;
        this.changelogStoragesManager = changelogStoragesManager;
        this.jobManagerTaskRestore = jobManagerTaskRestore;
        this.executionAttemptID = executionAttemptID;
        this.checkpointResponder = checkpointResponder;
        this.sequentialChannelStateReader = sequentialChannelStateReader;
    }

    @Override
    public void reportInitializationMetrics(SubTaskInitializationMetrics subTaskInitializationMetrics) {
        this.checkpointResponder.reportInitializationMetrics(this.jobId, this.executionAttemptID, subTaskInitializationMetrics);
    }

    @Override
    public void reportTaskStateSnapshots(@Nonnull CheckpointMetaData checkpointMetaData, @Nonnull CheckpointMetrics checkpointMetrics, @Nullable TaskStateSnapshot acknowledgedState, @Nullable TaskStateSnapshot localState) {
        long checkpointId = checkpointMetaData.getCheckpointId();
        this.localStateStore.storeLocalState(checkpointId, localState);
        this.checkpointResponder.acknowledgeCheckpoint(this.jobId, this.executionAttemptID, checkpointId, checkpointMetrics, acknowledgedState);
    }

    @Override
    public void reportIncompleteTaskStateSnapshots(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) {
        this.checkpointResponder.reportCheckpointMetrics(this.jobId, this.executionAttemptID, checkpointMetaData.getCheckpointId(), checkpointMetrics);
    }

    @Override
    public InflightDataRescalingDescriptor getInputRescalingDescriptor() {
        if (this.jobManagerTaskRestore == null) {
            return InflightDataRescalingDescriptor.NO_RESCALE;
        }
        return this.jobManagerTaskRestore.getTaskStateSnapshot().getInputRescalingDescriptor();
    }

    @Override
    public InflightDataRescalingDescriptor getOutputRescalingDescriptor() {
        if (this.jobManagerTaskRestore == null) {
            return InflightDataRescalingDescriptor.NO_RESCALE;
        }
        return this.jobManagerTaskRestore.getTaskStateSnapshot().getOutputRescalingDescriptor();
    }

    @Override
    public boolean isTaskDeployedAsFinished() {
        if (this.jobManagerTaskRestore == null) {
            return false;
        }
        return this.jobManagerTaskRestore.getTaskStateSnapshot().isTaskDeployedAsFinished();
    }

    @Override
    public Optional<Long> getRestoreCheckpointId() {
        if (this.jobManagerTaskRestore == null) {
            return Optional.empty();
        }
        return Optional.of(this.jobManagerTaskRestore.getRestoreCheckpointId());
    }

    @Override
    public PrioritizedOperatorSubtaskState prioritizedOperatorState(OperatorID operatorID) {
        OperatorSubtaskState localSubtaskState;
        if (this.jobManagerTaskRestore == null) {
            return PrioritizedOperatorSubtaskState.emptyNotRestored();
        }
        TaskStateSnapshot jobManagerStateSnapshot = this.jobManagerTaskRestore.getTaskStateSnapshot();
        OperatorSubtaskState jobManagerSubtaskState = jobManagerStateSnapshot.getSubtaskStateByOperatorID(operatorID);
        if (jobManagerSubtaskState == null) {
            return PrioritizedOperatorSubtaskState.empty(this.jobManagerTaskRestore.getRestoreCheckpointId());
        }
        long restoreCheckpointId = this.jobManagerTaskRestore.getRestoreCheckpointId();
        TaskStateSnapshot localStateSnapshot = this.localStateStore.retrieveLocalState(restoreCheckpointId);
        this.localStateStore.pruneMatchingCheckpoints(checkpointId -> checkpointId != restoreCheckpointId);
        List<OperatorSubtaskState> alternativesByPriority = Collections.emptyList();
        if (localStateSnapshot != null && (localSubtaskState = localStateSnapshot.getSubtaskStateByOperatorID(operatorID)) != null) {
            alternativesByPriority = Collections.singletonList(localSubtaskState);
        }
        LOG.debug("Operator {} has remote state {} from job manager and local state alternatives {} from local state store {}.", new Object[]{operatorID, jobManagerSubtaskState, alternativesByPriority, this.localStateStore});
        PrioritizedOperatorSubtaskState.Builder builder = new PrioritizedOperatorSubtaskState.Builder(jobManagerSubtaskState, alternativesByPriority, this.jobManagerTaskRestore.getRestoreCheckpointId());
        return builder.build();
    }

    @Override
    public Optional<OperatorSubtaskState> getSubtaskJobManagerRestoredState(OperatorID operatorID) {
        if (this.jobManagerTaskRestore == null) {
            return Optional.empty();
        }
        OperatorSubtaskState state = this.jobManagerTaskRestore.getTaskStateSnapshot().getSubtaskStateByOperatorID(operatorID);
        return state == null ? Optional.empty() : Optional.of(state);
    }

    @Override
    @Nonnull
    public LocalRecoveryConfig createLocalRecoveryConfig() {
        return this.localStateStore.getLocalRecoveryConfig();
    }

    @Override
    public SequentialChannelStateReader getSequentialChannelStateReader() {
        return this.sequentialChannelStateReader;
    }

    @Override
    @Nullable
    public StateChangelogStorage<?> getStateChangelogStorage() {
        return this.stateChangelogStorage;
    }

    @Override
    @Nullable
    public StateChangelogStorageView<?> getStateChangelogStorageView(Configuration configuration, ChangelogStateHandle changelogStateHandle) {
        StateChangelogStorageView<?> storageView = null;
        try {
            storageView = this.changelogStoragesManager.stateChangelogStorageViewForJob(this.jobId, configuration, changelogStateHandle);
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
        return storageView;
    }

    @Override
    @Nullable
    public FileMergingSnapshotManager getFileMergingSnapshotManager() {
        return this.fileMergingSnapshotManager == null ? null : this.fileMergingSnapshotManager.get();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.localStateStore.confirmCheckpoint(checkpointId);
    }

    public void notifyCheckpointAborted(long checkpointId) {
        this.localStateStore.abortCheckpoint(checkpointId);
    }

    @Override
    public void close() throws Exception {
        this.sequentialChannelStateReader.close();
        if (this.fileMergingSnapshotManager != null) {
            this.fileMergingSnapshotManager.close();
        }
    }
}

