/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.File;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProviderImpl;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskLocalStateStoreImpl;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class TaskStateManagerImplTest {
    TaskStateManagerImplTest() {
    }

    @Test
    void testStateReportingAndRetrieving() {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = ExecutionGraphTestUtils.createExecutionAttemptId();
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        TestTaskLocalStateStore testTaskLocalStateStore = new TestTaskLocalStateStore();
        InMemoryStateChangelogStorage changelogStorage = new InMemoryStateChangelogStorage();
        TaskStateManager taskStateManager = TaskStateManagerImplTest.taskStateManager(jobID, executionAttemptID, testCheckpointResponder, null, testTaskLocalStateStore, changelogStorage);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(74L, 11L);
        CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
        TaskStateSnapshot jmTaskStateSnapshot = new TaskStateSnapshot();
        OperatorID operatorID_1 = new OperatorID(1L, 1L);
        OperatorID operatorID_2 = new OperatorID(2L, 2L);
        OperatorID operatorID_3 = new OperatorID(3L, 3L);
        Assertions.assertThat((boolean)taskStateManager.prioritizedOperatorState(operatorID_1).isRestored()).isFalse();
        Assertions.assertThat((boolean)taskStateManager.prioritizedOperatorState(operatorID_2).isRestored()).isFalse();
        Assertions.assertThat((boolean)taskStateManager.prioritizedOperatorState(operatorID_3).isRestored()).isFalse();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        OperatorSubtaskState jmOperatorSubtaskState_1 = OperatorSubtaskState.builder().setManagedKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        OperatorSubtaskState jmOperatorSubtaskState_2 = OperatorSubtaskState.builder().setRawKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        jmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, jmOperatorSubtaskState_1);
        jmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, jmOperatorSubtaskState_2);
        TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot();
        OperatorSubtaskState tmOperatorSubtaskState_1 = OperatorSubtaskState.builder().setManagedKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        tmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, tmOperatorSubtaskState_1);
        taskStateManager.reportTaskStateSnapshots(checkpointMetaData, checkpointMetrics, jmTaskStateSnapshot, tmTaskStateSnapshot);
        TestCheckpointResponder.AcknowledgeReport acknowledgeReport = testCheckpointResponder.getAcknowledgeReports().get(0);
        Assertions.assertThat((long)acknowledgeReport.getCheckpointId()).isEqualTo(checkpointMetaData.getCheckpointId());
        Assertions.assertThat((Object)acknowledgeReport.getCheckpointMetrics()).isEqualTo((Object)checkpointMetrics);
        Assertions.assertThat((Object)acknowledgeReport.getExecutionAttemptID()).isEqualTo((Object)executionAttemptID);
        Assertions.assertThat((Comparable)acknowledgeReport.getJobID()).isEqualTo((Object)jobID);
        Assertions.assertThat((Object)acknowledgeReport.getSubtaskState()).isEqualTo((Object)jmTaskStateSnapshot);
        Assertions.assertThat((Object)testTaskLocalStateStore.retrieveLocalState(checkpointMetaData.getCheckpointId())).isEqualTo((Object)tmTaskStateSnapshot);
        JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(checkpointMetaData.getCheckpointId(), acknowledgeReport.getSubtaskState());
        taskStateManager = TaskStateManagerImplTest.taskStateManager(jobID, executionAttemptID, testCheckpointResponder, taskRestore, testTaskLocalStateStore, changelogStorage);
        PrioritizedOperatorSubtaskState prioritized_1 = taskStateManager.prioritizedOperatorState(operatorID_1);
        PrioritizedOperatorSubtaskState prioritized_2 = taskStateManager.prioritizedOperatorState(operatorID_2);
        PrioritizedOperatorSubtaskState prioritized_3 = taskStateManager.prioritizedOperatorState(operatorID_3);
        Assertions.assertThat((boolean)prioritized_1.isRestored()).isTrue();
        Assertions.assertThat((boolean)prioritized_2.isRestored()).isTrue();
        Assertions.assertThat((boolean)prioritized_3.isRestored()).isTrue();
        Assertions.assertThat((boolean)taskStateManager.prioritizedOperatorState(new OperatorID()).isRestored()).isTrue();
        Iterator prioritizedManagedKeyedState_1 = prioritized_1.getPrioritizedManagedKeyedState().iterator();
        Assertions.assertThat(prioritizedManagedKeyedState_1).hasNext();
        StateObjectCollection current = (StateObjectCollection)prioritizedManagedKeyedState_1.next();
        KeyedStateHandle keyedStateHandleExp = (KeyedStateHandle)tmOperatorSubtaskState_1.getManagedKeyedState().iterator().next();
        KeyedStateHandle keyedStateHandleAct = (KeyedStateHandle)current.iterator().next();
        Assertions.assertThat((Object)keyedStateHandleExp).isSameAs((Object)keyedStateHandleAct);
        Assertions.assertThat(prioritizedManagedKeyedState_1).hasNext();
        current = (StateObjectCollection)prioritizedManagedKeyedState_1.next();
        keyedStateHandleExp = (KeyedStateHandle)jmOperatorSubtaskState_1.getManagedKeyedState().iterator().next();
        keyedStateHandleAct = (KeyedStateHandle)current.iterator().next();
        Assertions.assertThat((Object)keyedStateHandleExp).isSameAs((Object)keyedStateHandleAct);
        Assertions.assertThat(prioritizedManagedKeyedState_1).isExhausted();
        Iterator prioritizedRawKeyedState_2 = prioritized_2.getPrioritizedRawKeyedState().iterator();
        Assertions.assertThat(prioritizedRawKeyedState_2).hasNext();
        current = (StateObjectCollection)prioritizedRawKeyedState_2.next();
        keyedStateHandleExp = (KeyedStateHandle)jmOperatorSubtaskState_2.getRawKeyedState().iterator().next();
        keyedStateHandleAct = (KeyedStateHandle)current.iterator().next();
        Assertions.assertThat((Object)keyedStateHandleExp).isSameAs((Object)keyedStateHandleAct);
        Assertions.assertThat(prioritizedRawKeyedState_2).isExhausted();
    }

    @Test
    void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore(@TempDir Path tmpFolder) throws Exception {
        JobID jobID = new JobID(42L, 43L);
        AllocationID allocationID = new AllocationID(4711L, 23L);
        JobVertexID jobVertexID = new JobVertexID(12L, 34L);
        ExecutionAttemptID executionAttemptID = ExecutionGraphTestUtils.createExecutionAttemptId(jobVertexID);
        TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder();
        Executor directExecutor = Executors.directExecutor();
        File[] allocBaseDirs = new File[]{TempDirUtils.newFolder((Path)tmpFolder), TempDirUtils.newFolder((Path)tmpFolder), TempDirUtils.newFolder((Path)tmpFolder)};
        LocalSnapshotDirectoryProviderImpl directoryProvider = new LocalSnapshotDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0);
        LocalRecoveryConfig localRecoveryConfig = LocalRecoveryConfig.backupAndRecoveryEnabled((LocalSnapshotDirectoryProvider)directoryProvider);
        TaskLocalStateStoreImpl taskLocalStateStore = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor);
        InMemoryStateChangelogStorage changelogStorage = new InMemoryStateChangelogStorage();
        TaskStateManager taskStateManager = TaskStateManagerImplTest.taskStateManager(jobID, executionAttemptID, checkpointResponderMock, null, (TaskLocalStateStore)taskLocalStateStore, changelogStorage);
        LocalRecoveryConfig localRecoveryConfFromTaskLocalStateStore = taskLocalStateStore.getLocalRecoveryConfig();
        LocalRecoveryConfig localRecoveryConfFromTaskStateManager = taskStateManager.createLocalRecoveryConfig();
        for (int i = 0; i < 10; ++i) {
            Assertions.assertThat((File)((LocalSnapshotDirectoryProvider)localRecoveryConfFromTaskLocalStateStore.getLocalStateDirectoryProvider().get()).allocationBaseDirectory((long)i)).isEqualTo((Object)allocBaseDirs[i % allocBaseDirs.length]);
            Assertions.assertThat((File)((LocalSnapshotDirectoryProvider)localRecoveryConfFromTaskStateManager.getLocalStateDirectoryProvider().get()).allocationBaseDirectory((long)i)).isEqualTo((Object)allocBaseDirs[i % allocBaseDirs.length]);
        }
        Assertions.assertThat((boolean)localRecoveryConfFromTaskStateManager.isLocalRecoveryEnabled()).isEqualTo(localRecoveryConfFromTaskLocalStateStore.isLocalRecoveryEnabled());
    }

    @Test
    void testStateRetrievingWithFinishedOperator() {
        TaskStateSnapshot taskStateSnapshot = TaskStateSnapshot.FINISHED_ON_RESTORE;
        JobManagerTaskRestore jobManagerTaskRestore = new JobManagerTaskRestore(2L, taskStateSnapshot);
        TaskStateManagerImpl stateManager = new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), (TaskLocalStateStore)new TestTaskLocalStateStore(), null, null, new TaskExecutorStateChangelogStoragesManager(), jobManagerTaskRestore, (CheckpointResponder)new TestCheckpointResponder());
        Assertions.assertThat((boolean)stateManager.isTaskDeployedAsFinished()).isTrue();
    }

    void testAcquringRestoreCheckpointId() {
        TaskStateManagerImpl emptyStateManager = new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), (TaskLocalStateStore)new TestTaskLocalStateStore(), null, null, new TaskExecutorStateChangelogStoragesManager(), null, (CheckpointResponder)new TestCheckpointResponder());
        Assertions.assertThat((Optional)emptyStateManager.getRestoreCheckpointId()).isNotPresent();
        TaskStateManagerImpl nonEmptyStateManager = new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), (TaskLocalStateStore)new TestTaskLocalStateStore(), null, null, new TaskExecutorStateChangelogStoragesManager(), new JobManagerTaskRestore(2L, new TaskStateSnapshot()), (CheckpointResponder)new TestCheckpointResponder());
        Assertions.assertThat((Optional)nonEmptyStateManager.getRestoreCheckpointId()).hasValue((Object)2L);
    }

    private static TaskStateManager taskStateManager(JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointResponder checkpointResponderMock, JobManagerTaskRestore jobManagerTaskRestore, TaskLocalStateStore localStateStore, StateChangelogStorage<?> stateChangelogStorage) {
        return new TaskStateManagerImpl(jobID, executionAttemptID, localStateStore, null, stateChangelogStorage, new TaskExecutorStateChangelogStoragesManager(), jobManagerTaskRestore, checkpointResponderMock);
    }
}

