/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageView;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class TaskExecutorStateChangelogStoragesManagerTest {
    TaskExecutorStateChangelogStoragesManagerTest() {
    }

    @Test
    void testDuplicatedAllocation() throws IOException {
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage2).isEqualTo((Object)storage1);
        JobID jobId2 = new JobID(1L, 2L);
        StateChangelogStorage storage3 = manager.stateChangelogStorageForJob(jobId2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage3).isNotEqualTo((Object)storage1);
        manager.shutdown();
    }

    @Test
    void testReleaseForJob() throws IOException {
        StateChangelogStorageLoader.initialize((PluginManager)TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)TestStateChangelogStorageFactory.identifier);
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage1).isInstanceOf(TestStateChangelogStorage.class);
        Assertions.assertThat((boolean)((TestStateChangelogStorage)storage1).closed).isFalse();
        manager.releaseResourcesForJob(jobId1);
        Assertions.assertThat((boolean)((TestStateChangelogStorage)storage1).closed).isTrue();
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage2).isNotEqualTo((Object)storage1);
        manager.shutdown();
        StateChangelogStorageLoader.initialize(null);
    }

    @Test
    void testConsistencyAmongTask() throws IOException {
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)"invalid");
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage1).isNull();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, StateChangelogOptions.STATE_CHANGE_LOG_STORAGE.defaultValue());
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage2).isNull();
        JobID jobId2 = new JobID(1L, 2L);
        StateChangelogStorage storage3 = manager.stateChangelogStorageForJob(jobId2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage3).isNotNull();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)"invalid");
        StateChangelogStorage storage4 = manager.stateChangelogStorageForJob(jobId2, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage4).isNotNull();
        Assertions.assertThat((Object)storage4).isEqualTo((Object)storage3);
        manager.shutdown();
    }

    @Test
    void testShutdown() throws IOException {
        StateChangelogStorageLoader.initialize((PluginManager)TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(StateChangelogOptions.STATE_CHANGE_LOG_STORAGE, (Object)TestStateChangelogStorageFactory.identifier);
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage1).isInstanceOf(TestStateChangelogStorage.class);
        Assertions.assertThat((boolean)((TestStateChangelogStorage)storage1).closed).isFalse();
        JobID jobId2 = new JobID(1L, 2L);
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration, UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(), TestLocalRecoveryConfig.disabled());
        Assertions.assertThat((Object)storage2).isInstanceOf(TestStateChangelogStorage.class);
        Assertions.assertThat((boolean)((TestStateChangelogStorage)storage2).closed).isFalse();
        manager.shutdown();
        Assertions.assertThat((boolean)((TestStateChangelogStorage)storage1).closed).isTrue();
        Assertions.assertThat((boolean)((TestStateChangelogStorage)storage2).closed).isTrue();
        StateChangelogStorageLoader.initialize(null);
    }

    private static class TestStateChangelogStorageFactory
    implements StateChangelogStorageFactory {
        public static String identifier = "test-factory";
        public static PluginManager pluginManager = new PluginManager(){

            public <P> Iterator<P> load(Class<P> service) {
                Preconditions.checkArgument((boolean)service.equals(StateChangelogStorageFactory.class));
                return Collections.singletonList(new TestStateChangelogStorageFactory()).iterator();
            }
        };

        private TestStateChangelogStorageFactory() {
        }

        public String getIdentifier() {
            return identifier;
        }

        public StateChangelogStorage<?> createStorage(JobID jobID, Configuration configuration, TaskManagerJobMetricGroup metricGroup, LocalRecoveryConfig localRecoveryConfig) {
            return new TestStateChangelogStorage();
        }

        public StateChangelogStorageView<?> createStorageView(Configuration configuration) throws IOException {
            return new TestStateChangelogStorage();
        }
    }

    private static class TestStateChangelogStorage
    implements StateChangelogStorage<ChangelogStateHandle> {
        public boolean closed = false;

        private TestStateChangelogStorage() {
        }

        public StateChangelogWriter<ChangelogStateHandle> createWriter(String operatorID, KeyGroupRange keyGroupRange, MailboxExecutor mailboxExecutor) {
            return null;
        }

        public StateChangelogHandleReader<ChangelogStateHandle> createReader() {
            return null;
        }

        public void close() {
            this.closed = true;
        }
    }
}

