package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.TaskLocalStateStoreImplTest;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/ChangelogTaskLocalStateStoreTest.class */
class ChangelogTaskLocalStateStoreTest extends TaskLocalStateStoreImplTest {
    private LocalRecoveryDirectoryProvider localRecoveryDirectoryProvider;

    ChangelogTaskLocalStateStoreTest() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImplTest
    @BeforeEach
    public void before() throws Exception {
        super.before();
        this.taskLocalStateStore = createChangelogTaskLocalStateStore(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx);
    }

    @Nonnull
    private ChangelogTaskLocalStateStore createChangelogTaskLocalStateStore(File[] fileArr, JobID jobID, AllocationID allocationID, JobVertexID jobVertexID, int i) {
        LocalRecoveryDirectoryProviderImpl localRecoveryDirectoryProviderImpl = new LocalRecoveryDirectoryProviderImpl(fileArr, jobID, jobVertexID, i);
        this.localRecoveryDirectoryProvider = localRecoveryDirectoryProviderImpl;
        return new ChangelogTaskLocalStateStore(jobID, allocationID, jobVertexID, i, new LocalRecoveryConfig(localRecoveryDirectoryProviderImpl), Executors.directExecutor());
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImplTest
    @Test
    void pruneCheckpoints() throws Exception {
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates = storeChangelogStates(1L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates2 = storeChangelogStates(2L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates3 = storeChangelogStates(3L, 1L);
        this.taskLocalStateStore.pruneMatchingCheckpoints(j -> {
            return j != 2;
        });
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(3L)).isNull();
        Assertions.assertThat(storeChangelogStates3.isDiscarded()).isTrue();
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(1L)).isNull();
        Assertions.assertThat(storeChangelogStates.isDiscarded()).isTrue();
        Assertions.assertThat(checkMaterializedDirExists(1L)).isTrue();
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(2L)).isEqualTo(storeChangelogStates2);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImplTest
    @Test
    void confirmCheckpoint() throws Exception {
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates = storeChangelogStates(1L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates2 = storeChangelogStates(2L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates3 = storeChangelogStates(3L, 1L);
        this.taskLocalStateStore.confirmCheckpoint(3L);
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(2L)).isNull();
        Assertions.assertThat(storeChangelogStates2.isDiscarded()).isTrue();
        Assertions.assertThat(storeChangelogStates.isDiscarded()).isTrue();
        Assertions.assertThat(checkMaterializedDirExists(1L)).isTrue();
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(3L)).isEqualTo(storeChangelogStates3);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates4 = storeChangelogStates(4L, 2L);
        this.taskLocalStateStore.confirmCheckpoint(4L);
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(3L)).isNull();
        Assertions.assertThat(storeChangelogStates3.isDiscarded()).isTrue();
        Assertions.assertThat(checkMaterializedDirExists(1L)).isFalse();
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(4L)).isEqualTo(storeChangelogStates4);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImplTest
    @Test
    void abortCheckpoint() throws Exception {
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates = storeChangelogStates(1L, 1L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates2 = storeChangelogStates(2L, 2L);
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates3 = storeChangelogStates(3L, 2L);
        this.taskLocalStateStore.abortCheckpoint(2L);
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(2L)).isNull();
        Assertions.assertThat(storeChangelogStates2.isDiscarded()).isTrue();
        Assertions.assertThat(checkMaterializedDirExists(2L)).isTrue();
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(1L)).isEqualTo(storeChangelogStates);
        Assertions.assertThat(checkMaterializedDirExists(1L)).isTrue();
        Assertions.assertThat(this.taskLocalStateStore.retrieveLocalState(3L)).isEqualTo(storeChangelogStates3);
        this.taskLocalStateStore.abortCheckpoint(3L);
        Assertions.assertThat(checkMaterializedDirExists(2L)).isFalse();
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImplTest
    @Test
    void retrievePersistedLocalStateFromDisc() {
        TaskStateSnapshot createTaskStateSnapshot = createTaskStateSnapshot();
        this.taskLocalStateStore.storeLocalState(0L, createTaskStateSnapshot);
        Assertions.assertThat(createChangelogTaskLocalStateStore(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx).retrieveLocalState(0L)).isEqualTo(createTaskStateSnapshot);
    }

    @Override // org.apache.flink.runtime.state.TaskLocalStateStoreImplTest
    @Test
    void deletesLocalStateIfRetrievalFails() throws IOException {
        this.taskLocalStateStore.storeLocalState(0L, createTaskStateSnapshot());
        File taskStateSnapshotFile = this.taskLocalStateStore.getTaskStateSnapshotFile(0L);
        Files.write(taskStateSnapshotFile.toPath(), new byte[]{1, 2, 3, 4}, StandardOpenOption.WRITE);
        Assertions.assertThat(createChangelogTaskLocalStateStore(this.allocationBaseDirs, this.jobID, this.allocationID, this.jobVertexID, this.subtaskIdx).retrieveLocalState(0L)).isNull();
        Assertions.assertThat(taskStateSnapshotFile.getParentFile()).doesNotExist();
    }

    private boolean checkMaterializedDirExists(long j) {
        return this.localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(j).exists();
    }

    private void writeToMaterializedDir(long j) {
        File subtaskSpecificCheckpointDirectory = this.localRecoveryDirectoryProvider.subtaskSpecificCheckpointDirectory(j);
        if (!subtaskSpecificCheckpointDirectory.exists() && !subtaskSpecificCheckpointDirectory.mkdirs()) {
            throw new FlinkRuntimeException(String.format("Could not create the materialized directory '%s'", subtaskSpecificCheckpointDirectory));
        }
    }

    private TaskLocalStateStoreImplTest.TestingTaskStateSnapshot storeChangelogStates(long j, long j2) {
        writeToMaterializedDir(j2);
        OperatorID operatorID = new OperatorID();
        TaskLocalStateStoreImplTest.TestingTaskStateSnapshot testingTaskStateSnapshot = new TaskLocalStateStoreImplTest.TestingTaskStateSnapshot();
        testingTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID, OperatorSubtaskState.builder().setManagedKeyedState(new ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl(Collections.emptyList(), Collections.emptyList(), new KeyGroupRange(0, 3), j, j2, j)).build());
        this.taskLocalStateStore.storeLocalState(j, testingTaskStateSnapshot);
        return testingTaskStateSnapshot;
    }
}
