package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskStateManagerImplTest.class */
public class TaskStateManagerImplTest extends TestLogger {
    @Test
    public void testStateReportingAndRetrieving() {
        JobID jobID = new JobID();
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId();
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        TestTaskLocalStateStore testTaskLocalStateStore = new TestTaskLocalStateStore();
        InMemoryStateChangelogStorage inMemoryStateChangelogStorage = new InMemoryStateChangelogStorage();
        TaskStateManager taskStateManager = taskStateManager(jobID, createExecutionAttemptId, testCheckpointResponder, null, testTaskLocalStateStore, inMemoryStateChangelogStorage);
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(74L, 11L);
        CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
        TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
        OperatorID operatorID = new OperatorID(1L, 1L);
        OperatorID operatorID2 = new OperatorID(2L, 2L);
        OperatorID operatorID3 = new OperatorID(3L, 3L);
        Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID).isRestored());
        Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID2).isRestored());
        Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID3).isRestored());
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        OperatorSubtaskState build = OperatorSubtaskState.builder().setManagedKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        OperatorSubtaskState build2 = OperatorSubtaskState.builder().setRawKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, build);
        taskStateSnapshot.putSubtaskStateByOperatorID(operatorID2, build2);
        TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot();
        OperatorSubtaskState build3 = OperatorSubtaskState.builder().setManagedKeyedState(StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)).build();
        taskStateSnapshot2.putSubtaskStateByOperatorID(operatorID, build3);
        taskStateManager.reportTaskStateSnapshots(checkpointMetaData, checkpointMetrics, taskStateSnapshot, taskStateSnapshot2);
        TestCheckpointResponder.AcknowledgeReport acknowledgeReport = testCheckpointResponder.getAcknowledgeReports().get(0);
        Assert.assertEquals(checkpointMetaData.getCheckpointId(), acknowledgeReport.getCheckpointId());
        Assert.assertEquals(checkpointMetrics, acknowledgeReport.getCheckpointMetrics());
        Assert.assertEquals(createExecutionAttemptId, acknowledgeReport.getExecutionAttemptID());
        Assert.assertEquals(jobID, acknowledgeReport.getJobID());
        Assert.assertEquals(taskStateSnapshot, acknowledgeReport.getSubtaskState());
        Assert.assertEquals(taskStateSnapshot2, testTaskLocalStateStore.retrieveLocalState(checkpointMetaData.getCheckpointId()));
        TaskStateManager taskStateManager2 = taskStateManager(jobID, createExecutionAttemptId, testCheckpointResponder, new JobManagerTaskRestore(checkpointMetaData.getCheckpointId(), acknowledgeReport.getSubtaskState()), testTaskLocalStateStore, inMemoryStateChangelogStorage);
        PrioritizedOperatorSubtaskState prioritizedOperatorState = taskStateManager2.prioritizedOperatorState(operatorID);
        PrioritizedOperatorSubtaskState prioritizedOperatorState2 = taskStateManager2.prioritizedOperatorState(operatorID2);
        PrioritizedOperatorSubtaskState prioritizedOperatorState3 = taskStateManager2.prioritizedOperatorState(operatorID3);
        Assert.assertTrue(prioritizedOperatorState.isRestored());
        Assert.assertTrue(prioritizedOperatorState2.isRestored());
        Assert.assertTrue(prioritizedOperatorState3.isRestored());
        Assert.assertTrue(taskStateManager2.prioritizedOperatorState(new OperatorID()).isRestored());
        Iterator it = prioritizedOperatorState.getPrioritizedManagedKeyedState().iterator();
        Assert.assertTrue(it.hasNext());
        Assert.assertTrue(((KeyedStateHandle) build3.getManagedKeyedState().iterator().next()) == ((KeyedStateHandle) ((StateObjectCollection) it.next()).iterator().next()));
        Assert.assertTrue(it.hasNext());
        Assert.assertTrue(((KeyedStateHandle) build.getManagedKeyedState().iterator().next()) == ((KeyedStateHandle) ((StateObjectCollection) it.next()).iterator().next()));
        Assert.assertFalse(it.hasNext());
        Iterator it2 = prioritizedOperatorState2.getPrioritizedRawKeyedState().iterator();
        Assert.assertTrue(it2.hasNext());
        Assert.assertTrue(((KeyedStateHandle) build2.getRawKeyedState().iterator().next()) == ((KeyedStateHandle) ((StateObjectCollection) it2.next()).iterator().next()));
        Assert.assertFalse(it2.hasNext());
    }

    @Test
    public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws IOException {
        JobID jobID = new JobID(42L, 43L);
        AllocationID allocationID = new AllocationID(4711L, 23L);
        JobVertexID jobVertexID = new JobVertexID(12L, 34L);
        ExecutionAttemptID createExecutionAttemptId = ExecutionGraphTestUtils.createExecutionAttemptId(jobVertexID);
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        Executor directExecutor = Executors.directExecutor();
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        try {
            temporaryFolder.create();
            File[] fileArr = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()};
            TaskLocalStateStoreImpl taskLocalStateStoreImpl = new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, new LocalRecoveryConfig(new LocalRecoveryDirectoryProviderImpl(fileArr, jobID, jobVertexID, 0)), directExecutor);
            TaskStateManager taskStateManager = taskStateManager(jobID, createExecutionAttemptId, testCheckpointResponder, null, taskLocalStateStoreImpl, new InMemoryStateChangelogStorage());
            LocalRecoveryConfig localRecoveryConfig = taskLocalStateStoreImpl.getLocalRecoveryConfig();
            LocalRecoveryConfig createLocalRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
            for (int i = 0; i < 10; i++) {
                Assert.assertEquals(fileArr[i % fileArr.length], ((LocalRecoveryDirectoryProvider) localRecoveryConfig.getLocalStateDirectoryProvider().get()).allocationBaseDirectory(i));
                Assert.assertEquals(fileArr[i % fileArr.length], ((LocalRecoveryDirectoryProvider) createLocalRecoveryConfig.getLocalStateDirectoryProvider().get()).allocationBaseDirectory(i));
            }
            Assert.assertEquals(Boolean.valueOf(localRecoveryConfig.isLocalRecoveryEnabled()), Boolean.valueOf(createLocalRecoveryConfig.isLocalRecoveryEnabled()));
            temporaryFolder.delete();
        } catch (Throwable th) {
            temporaryFolder.delete();
            throw th;
        }
    }

    @Test
    public void testStateRetrievingWithFinishedOperator() {
        Assert.assertTrue(new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), new TestTaskLocalStateStore(), (StateChangelogStorage) null, new JobManagerTaskRestore(2L, TaskStateSnapshot.FINISHED_ON_RESTORE), new TestCheckpointResponder()).isTaskDeployedAsFinished());
    }

    public void testAcquringRestoreCheckpointId() {
        Assert.assertFalse(new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), new TestTaskLocalStateStore(), (StateChangelogStorage) null, (JobManagerTaskRestore) null, new TestCheckpointResponder()).getRestoreCheckpointId().isPresent());
        Assert.assertEquals(2L, ((Long) new TaskStateManagerImpl(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), new TestTaskLocalStateStore(), (StateChangelogStorage) null, new JobManagerTaskRestore(2L, new TaskStateSnapshot()), new TestCheckpointResponder()).getRestoreCheckpointId().get()).longValue());
    }

    public static TaskStateManager taskStateManager(JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointResponder checkpointResponder, JobManagerTaskRestore jobManagerTaskRestore, TaskLocalStateStore taskLocalStateStore, StateChangelogStorage<?> stateChangelogStorage) {
        return new TaskStateManagerImpl(jobID, executionAttemptID, taskLocalStateStore, stateChangelogStorage, jobManagerTaskRestore, checkpointResponder);
    }
}
