package org.apache.flink.runtime.state;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.DynamicCodeLoadingException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest.class */
public class CheckpointStorageLoaderTest extends TestLogger {

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();
    private final ClassLoader cl = getClass().getClassLoader();

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$FailingFactory.class */
    static final class FailingFactory implements CheckpointStorageFactory<CheckpointStorage> {
        FailingFactory() {
        }

        public CheckpointStorage createFromConfig(ReadableConfig readableConfig, ClassLoader classLoader) throws IllegalConfigurationException {
            throw new IllegalConfigurationException("fail!");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$LegacyStateBackend.class */
    static final class LegacyStateBackend implements StateBackend, CheckpointStorage {
        LegacyStateBackend() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
            return null;
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$MockStorage.class */
    public static final class MockStorage implements CheckpointStorage {
        MockStorage() {
        }

        public CompletedCheckpointStorageLocation resolveCheckpoint(String str) throws IOException {
            return null;
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobID) throws IOException {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$ModernStateBackend.class */
    public static final class ModernStateBackend implements StateBackend {
        ModernStateBackend() {
        }

        public <K> CheckpointableKeyedStateBackend<K> createKeyedStateBackend(Environment environment, JobID jobID, String str, TypeSerializer<K> typeSerializer, int i, KeyGroupRange keyGroupRange, TaskKvStateRegistry taskKvStateRegistry, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, Collection<KeyedStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            return null;
        }

        public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) throws Exception {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$NormalizedPathMatcher.class */
    public static class NormalizedPathMatcher extends TypeSafeMatcher<Path> {
        private final Path reNormalizedExpected;

        private NormalizedPathMatcher(Path path) {
            this.reNormalizedExpected = path == null ? null : new Path(path.toString());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean matchesSafely(Path path) {
            if (this.reNormalizedExpected == null) {
                return path == null;
            }
            return this.reNormalizedExpected.equals(new Path(path.toString()));
        }

        public void describeTo(Description description) {
            description.appendValue(this.reNormalizedExpected);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/state/CheckpointStorageLoaderTest$WorkingFactory.class */
    static final class WorkingFactory implements CheckpointStorageFactory<MockStorage> {
        WorkingFactory() {
        }

        /* renamed from: createFromConfig, reason: merged with bridge method [inline-methods] */
        public MockStorage m495createFromConfig(ReadableConfig readableConfig, ClassLoader classLoader) throws IllegalConfigurationException {
            return new MockStorage();
        }
    }

    @Test
    public void testNoCheckpointStorageDefined() throws Exception {
        Assert.assertFalse(CheckpointStorageLoader.fromConfig(new Configuration(), this.cl, (Logger) null).isPresent());
    }

    @Test
    public void testLegacyStateBackendTakesPrecedence() throws Exception {
        LegacyStateBackend legacyStateBackend = new LegacyStateBackend();
        Assert.assertEquals("Legacy state backends should always take precedence", legacyStateBackend, CheckpointStorageLoader.load(new MockStorage(), (Path) null, legacyStateBackend, new Configuration(), this.cl, this.log));
    }

    @Test
    public void testModernStateBackendDoesNotTakePrecedence() throws Exception {
        ModernStateBackend modernStateBackend = new ModernStateBackend();
        MockStorage mockStorage = new MockStorage();
        Assert.assertEquals("Modern state backends should never take precedence", mockStorage, CheckpointStorageLoader.load(mockStorage, (Path) null, modernStateBackend, new Configuration(), this.cl, this.log));
    }

    @Test
    public void testLoadingFromFactory() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, WorkingFactory.class.getName());
        Assert.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), configuration, this.cl, this.log), Matchers.instanceOf(MockStorage.class));
    }

    @Test
    public void testDefaultCheckpointStorage() throws Exception {
        Assert.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), new Configuration(), this.cl, this.log), Matchers.instanceOf(JobManagerCheckpointStorage.class));
        String path = new Path(this.tmp.newFolder().toURI()).toString();
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path);
        Assert.assertThat(CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), configuration, this.cl, this.log), Matchers.instanceOf(FileSystemCheckpointStorage.class));
    }

    @Test
    public void testLoadingFails() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "does.not.exist");
        try {
            CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), configuration, this.cl, this.log);
            Assert.fail("should fail with exception");
        } catch (DynamicCodeLoadingException e) {
        }
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, File.class.getName());
        try {
            CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), configuration, this.cl, this.log);
            Assert.fail("should fail with exception");
        } catch (DynamicCodeLoadingException e2) {
        }
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, FailingFactory.class.getName());
        try {
            CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), configuration, this.cl, this.log);
            Assert.fail("should fail with exception");
        } catch (IllegalConfigurationException e3) {
        }
    }

    @Test
    public void testLoadJobManagerStorageNoParameters() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        Assert.assertThat((CheckpointStorage) CheckpointStorageLoader.fromConfig(configuration, this.cl, (Logger) null).get(), Matchers.instanceOf(JobManagerCheckpointStorage.class));
    }

    @Test
    public void testLoadJobManagerStorageWithParameters() throws Exception {
        String path = new Path(this.tmp.newFolder().toURI()).toString();
        Path path2 = new Path(path);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path);
        JobManagerCheckpointStorage jobManagerCheckpointStorage = (CheckpointStorage) CheckpointStorageLoader.fromConfig(configuration, this.cl, (Logger) null).get();
        Assert.assertThat(jobManagerCheckpointStorage, Matchers.instanceOf(JobManagerCheckpointStorage.class));
        Assert.assertEquals(path2, jobManagerCheckpointStorage.getSavepointPath());
    }

    @Test
    public void testConfigureJobManagerStorage() throws Exception {
        String path = new Path(this.tmp.newFolder().toURI()).toString();
        Path path2 = new Path(path);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path);
        JobManagerCheckpointStorage load = CheckpointStorageLoader.load(new JobManagerCheckpointStorage(100), (Path) null, new ModernStateBackend(), configuration, this.cl, this.log);
        Assert.assertThat(load, Matchers.instanceOf(JobManagerCheckpointStorage.class));
        Assert.assertThat(load.getSavepointPath(), normalizedPath(path2));
        Assert.assertEquals(100L, r0.getMaxStateSize());
    }

    @Test
    public void testConfigureJobManagerStorageWithParameters() throws Exception {
        String path = new Path(this.tmp.newFolder().toURI()).toString();
        Path path2 = new Path(this.tmp.newFolder().toURI());
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path);
        JobManagerCheckpointStorage load = CheckpointStorageLoader.load(new JobManagerCheckpointStorage(), path2, new ModernStateBackend(), configuration, this.cl, this.log);
        Assert.assertThat(load, Matchers.instanceOf(JobManagerCheckpointStorage.class));
        Assert.assertThat(load.getSavepointPath(), normalizedPath(path2));
    }

    @Test
    public void testLoadFileSystemCheckpointStorage() throws Exception {
        String path = new Path(this.tmp.newFolder().toURI()).toString();
        String path2 = new Path(this.tmp.newFolder().toURI()).toString();
        Path path3 = new Path(path);
        Path path4 = new Path(path2);
        MemorySize parse = MemorySize.parse("900kb");
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path);
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path2);
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, parse);
        configuration.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 1024);
        FileSystemCheckpointStorage fileSystemCheckpointStorage = (CheckpointStorage) CheckpointStorageLoader.fromConfig(configuration, this.cl, (Logger) null).get();
        Assert.assertThat(fileSystemCheckpointStorage, Matchers.instanceOf(FileSystemCheckpointStorage.class));
        FileSystemCheckpointStorage fileSystemCheckpointStorage2 = fileSystemCheckpointStorage;
        Assert.assertThat(fileSystemCheckpointStorage2.getCheckpointPath(), normalizedPath(path3));
        Assert.assertThat(fileSystemCheckpointStorage2.getSavepointPath(), normalizedPath(path4));
        Assert.assertEquals(parse.getBytes(), fileSystemCheckpointStorage2.getMinFileSizeThreshold());
        Assert.assertEquals(Math.max(parse.getBytes(), 1024L), fileSystemCheckpointStorage2.getWriteBufferSize());
    }

    @Test
    public void testLoadFileSystemCheckpointStorageMixed() throws Exception {
        Path path = new Path(this.tmp.newFolder().toURI());
        String path2 = new Path(this.tmp.newFolder().toURI()).toString();
        String path3 = new Path(this.tmp.newFolder().toURI()).toString();
        Path path4 = new Path(path3);
        FileSystemCheckpointStorage fileSystemCheckpointStorage = new FileSystemCheckpointStorage(path, 1000000, 4000000);
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path2);
        configuration.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, path3);
        configuration.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20"));
        configuration.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000);
        FileSystemCheckpointStorage load = CheckpointStorageLoader.load(fileSystemCheckpointStorage, (Path) null, new ModernStateBackend(), configuration, this.cl, this.log);
        Assert.assertThat(load, Matchers.instanceOf(FileSystemCheckpointStorage.class));
        FileSystemCheckpointStorage fileSystemCheckpointStorage2 = load;
        Assert.assertThat(fileSystemCheckpointStorage2.getCheckpointPath(), normalizedPath(path));
        Assert.assertThat(fileSystemCheckpointStorage2.getSavepointPath(), normalizedPath(path4));
        Assert.assertEquals(1000000L, fileSystemCheckpointStorage2.getMinFileSizeThreshold());
        Assert.assertEquals(4000000L, fileSystemCheckpointStorage2.getWriteBufferSize());
    }

    @Test
    public void testHighAvailabilityDefault() throws Exception {
        String path = new Path(this.tmp.newFolder().toURI()).toString();
        testMemoryBackendHighAvailabilityDefault(path, null);
        testMemoryBackendHighAvailabilityDefault(path, new Path(this.tmp.newFolder().toURI().toString()));
    }

    @Test
    public void testHighAvailabilityDefaultLocalPaths() throws Exception {
        String path = new Path(this.tmp.newFolder().getAbsolutePath()).toString();
        testMemoryBackendHighAvailabilityDefault(path, null);
        testMemoryBackendHighAvailabilityDefault(path, new Path(this.tmp.newFolder().toURI().toString()).makeQualified(FileSystem.getLocalFileSystem()));
    }

    private void testMemoryBackendHighAvailabilityDefault(String str, Path path) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
        configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, str);
        Configuration configuration2 = new Configuration();
        configuration2.set(CheckpointingOptions.CHECKPOINT_STORAGE, "jobmanager");
        configuration2.set(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration2.set(HighAvailabilityOptions.HA_CLUSTER_ID, "myCluster");
        configuration2.set(HighAvailabilityOptions.HA_STORAGE_PATH, str);
        if (path != null) {
            configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path.toUri().toString());
            configuration2.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, path.toUri().toString());
        }
        JobManagerCheckpointStorage load = CheckpointStorageLoader.load(new JobManagerCheckpointStorage(), (Path) null, new ModernStateBackend(), configuration, this.cl, this.log);
        JobManagerCheckpointStorage load2 = CheckpointStorageLoader.load((CheckpointStorage) null, (Path) null, new ModernStateBackend(), configuration2, this.cl, this.log);
        Assert.assertThat(load, Matchers.instanceOf(JobManagerCheckpointStorage.class));
        Assert.assertThat(load2, Matchers.instanceOf(JobManagerCheckpointStorage.class));
        JobManagerCheckpointStorage jobManagerCheckpointStorage = load;
        JobManagerCheckpointStorage jobManagerCheckpointStorage2 = load2;
        Assert.assertNull(jobManagerCheckpointStorage.getSavepointPath());
        Assert.assertNull(jobManagerCheckpointStorage2.getSavepointPath());
        if (path != null) {
            Assert.assertThat(jobManagerCheckpointStorage.getCheckpointPath(), normalizedPath(path));
            Assert.assertThat(jobManagerCheckpointStorage2.getCheckpointPath(), normalizedPath(path));
        } else {
            Assert.assertNull(jobManagerCheckpointStorage.getCheckpointPath());
            Assert.assertNull(jobManagerCheckpointStorage2.getCheckpointPath());
        }
    }

    private static Matcher<Path> normalizedPath(Path path) {
        return new NormalizedPathMatcher(path);
    }
}
