package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest.class */
public class TaskExecutorStateChangelogStoragesManagerTest {

    /* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest$TestStateChangelogStorage.class */
    private static class TestStateChangelogStorage implements StateChangelogStorage<ChangelogStateHandle> {
        public boolean closed;

        private TestStateChangelogStorage() {
            this.closed = false;
        }

        public StateChangelogWriter<ChangelogStateHandle> createWriter(String str, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
            return null;
        }

        public StateChangelogHandleReader<ChangelogStateHandle> createReader() {
            return null;
        }

        public void close() {
            this.closed = true;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/TaskExecutorStateChangelogStoragesManagerTest$TestStateChangelogStorageFactory.class */
    private static class TestStateChangelogStorageFactory implements StateChangelogStorageFactory {
        public static String identifier = "test-factory";
        public static PluginManager pluginManager = new PluginManager() { // from class: org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManagerTest.TestStateChangelogStorageFactory.1
            public <P> Iterator<P> load(Class<P> cls) {
                Preconditions.checkArgument(cls.equals(StateChangelogStorageFactory.class));
                return Collections.singletonList(new TestStateChangelogStorageFactory()).iterator();
            }
        };

        private TestStateChangelogStorageFactory() {
        }

        public String getIdentifier() {
            return identifier;
        }

        public StateChangelogStorage<?> createStorage(JobID jobID, Configuration configuration, TaskManagerJobMetricGroup taskManagerJobMetricGroup) {
            return new TestStateChangelogStorage();
        }

        public StateChangelogStorageView<?> createStorageView() throws IOException {
            return new TestStateChangelogStorage();
        }
    }

    @Test
    public void testDuplicatedAllocation() throws IOException {
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        JobID jobID = new JobID(1L, 1L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup());
        Assert.assertEquals(stateChangelogStorageForJob, taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()));
        Assert.assertNotEquals(stateChangelogStorageForJob, taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(new JobID(1L, 2L), configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()));
        taskExecutorStateChangelogStoragesManager.shutdown();
    }

    @Test
    public void testReleaseForJob() throws IOException {
        StateChangelogStorageLoader.initialize(TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, TestStateChangelogStorageFactory.identifier);
        JobID jobID = new JobID(1L, 1L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup());
        Assert.assertTrue(stateChangelogStorageForJob instanceof TestStateChangelogStorage);
        Assert.assertFalse(((TestStateChangelogStorage) stateChangelogStorageForJob).closed);
        taskExecutorStateChangelogStoragesManager.releaseStateChangelogStorageForJob(jobID);
        Assert.assertTrue(((TestStateChangelogStorage) stateChangelogStorageForJob).closed);
        Assert.assertNotEquals(stateChangelogStorageForJob, taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()));
        taskExecutorStateChangelogStoragesManager.shutdown();
        StateChangelogStorageLoader.initialize((PluginManager) null);
    }

    @Test
    public void testConsistencyAmongTask() throws IOException {
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "invalid");
        JobID jobID = new JobID(1L, 1L);
        Assert.assertNull(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()));
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, StateChangelogOptions.STATE_CHANGE_LOG_STORAGE.defaultValue());
        Assert.assertNull(taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup()));
        JobID jobID2 = new JobID(1L, 2L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup());
        Assert.assertNotNull(stateChangelogStorageForJob);
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, "invalid");
        StateChangelogStorage stateChangelogStorageForJob2 = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup());
        Assert.assertNotNull(stateChangelogStorageForJob2);
        Assert.assertEquals(stateChangelogStorageForJob, stateChangelogStorageForJob2);
        taskExecutorStateChangelogStoragesManager.shutdown();
    }

    @Test
    public void testShutdown() throws IOException {
        StateChangelogStorageLoader.initialize(TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager taskExecutorStateChangelogStoragesManager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, TestStateChangelogStorageFactory.identifier);
        JobID jobID = new JobID(1L, 1L);
        StateChangelogStorage stateChangelogStorageForJob = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup());
        Assert.assertTrue(stateChangelogStorageForJob instanceof TestStateChangelogStorage);
        Assert.assertFalse(((TestStateChangelogStorage) stateChangelogStorageForJob).closed);
        new JobID(1L, 2L);
        StateChangelogStorage stateChangelogStorageForJob2 = taskExecutorStateChangelogStoragesManager.stateChangelogStorageForJob(jobID, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup());
        Assert.assertTrue(stateChangelogStorageForJob2 instanceof TestStateChangelogStorage);
        Assert.assertFalse(((TestStateChangelogStorage) stateChangelogStorageForJob2).closed);
        taskExecutorStateChangelogStoragesManager.shutdown();
        Assert.assertTrue(((TestStateChangelogStorage) stateChangelogStorageForJob).closed);
        Assert.assertTrue(((TestStateChangelogStorage) stateChangelogStorageForJob2).closed);
        StateChangelogStorageLoader.initialize((PluginManager) null);
    }
}
