package org.apache.flink.test.state;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.changelog.fs.FsStateChangelogOptions;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.TestingPluginManager;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/test/state/ChangelogRecoveryCachingITCase.class */
public class ChangelogRecoveryCachingITCase extends TestLogger {
    private static final int ACCUMULATE_TIME_MILLIS = 500;
    private static final int PARALLELISM = 10;

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private OpenOnceFileSystem fileSystem;
    private MiniClusterWithClientResource cluster;

    /* loaded from: input_file:org/apache/flink/test/state/ChangelogRecoveryCachingITCase$OpenOnceFileSystem.class */
    private static class OpenOnceFileSystem extends LocalFileSystem {
        private final Set<Path> openedPaths;

        private OpenOnceFileSystem() {
            this.openedPaths = new HashSet();
        }

        public FSDataInputStream open(Path path) throws IOException {
            Assert.assertTrue(path + " was already opened", this.openedPaths.add(path));
            return super.open(path);
        }

        public boolean isDistributedFS() {
            return true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean hasOpenedPaths() {
            return !this.openedPaths.isEmpty();
        }
    }

    @Before
    public void before() throws Exception {
        File newFolder = this.temporaryFolder.newFolder();
        OpenOnceFileSystem openOnceFileSystem = new OpenOnceFileSystem();
        this.fileSystem = openOnceFileSystem;
        registerFileSystem(openOnceFileSystem, newFolder.toURI().getScheme());
        Configuration configuration = new Configuration();
        configuration.set(FsStateChangelogOptions.CACHE_IDLE_TIMEOUT, Duration.ofDays(365L));
        FsStateChangelogStorageFactory.configure(configuration, newFolder, Duration.ofMinutes(1L), PARALLELISM);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(PARALLELISM).build());
        this.cluster.before();
    }

    @After
    public void after() throws Exception {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
        FileSystem.initialize(new Configuration(), (PluginManager) null);
    }

    @Test
    public void test() throws Exception {
        JobID submit = submit(configureJob(this.temporaryFolder.newFolder()), jobGraph -> {
        });
        Thread.sleep(500L);
        String checkpointAndCancel = checkpointAndCancel(submit);
        JobID submit2 = submit(configureJob(this.temporaryFolder.newFolder()), jobGraph2 -> {
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(checkpointAndCancel));
        });
        CommonTestUtils.waitForAllTaskRunning(this.cluster.getMiniCluster(), submit2, true);
        this.cluster.getClusterClient().cancel(submit2).get();
        Preconditions.checkState(this.fileSystem.hasOpenedPaths());
    }

    private JobID submit(Configuration configuration, Consumer<JobGraph> consumer) throws InterruptedException, ExecutionException {
        JobGraph createJobGraph = createJobGraph(configuration);
        consumer.accept(createJobGraph);
        return (JobID) this.cluster.getClusterClient().submitJob(createJobGraph).get();
    }

    private JobGraph createJobGraph(Configuration configuration) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).keyBy(l -> {
            return Long.valueOf(l.longValue() % 1000);
        }).map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.test.state.ChangelogRecoveryCachingITCase.1
            public Long map(Long l2) throws Exception {
                getRuntimeContext().getState(new ValueStateDescriptor("state", Long.class)).update(l2);
                return l2;
            }
        }).addSink(new DiscardingSink());
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private Configuration configureJob(File file) {
        Configuration configuration = new Configuration();
        configuration.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        configuration.set(CoreOptions.DEFAULT_PARALLELISM, Integer.valueOf(PARALLELISM));
        configuration.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, true);
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE, CheckpointingMode.EXACTLY_ONCE);
        configuration.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, Duration.ofMillis(10L));
        configuration.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, file.toURI().toString());
        configuration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, false);
        configuration.set(FsStateChangelogOptions.PREEMPTIVE_PERSIST_THRESHOLD, MemorySize.ofMebiBytes(10L));
        configuration.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_INTERVAL, Duration.ofDays(-1L));
        configuration.set(ExecutionCheckpointingOptions.ENABLE_UNALIGNED, true);
        configuration.set(ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, Duration.ZERO);
        configuration.set(ExecutionCheckpointingOptions.UNALIGNED_MAX_SUBTASKS_PER_CHANNEL_STATE_FILE, 1);
        configuration.set(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED, false);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "none");
        return configuration;
    }

    private String checkpointAndCancel(JobID jobID) throws Exception {
        CommonTestUtils.waitForCheckpoint(jobID, this.cluster.getMiniCluster(), 1);
        this.cluster.getClusterClient().cancel(jobID).get();
        checkStatus(jobID);
        return (String) CommonTestUtils.getLatestCompletedCheckpointPath(jobID, this.cluster.getMiniCluster()).orElseThrow(() -> {
            throw new NoSuchElementException("No checkpoint was created yet");
        });
    }

    private void checkStatus(JobID jobID) throws InterruptedException, ExecutionException {
        if (((JobStatus) this.cluster.getClusterClient().getJobStatus(jobID).get()).isGloballyTerminalState()) {
            ((JobResult) this.cluster.getClusterClient().requestJobResult(jobID).get()).getSerializedThrowable().ifPresent(serializedThrowable -> {
                throw new RuntimeException((Throwable) serializedThrowable);
            });
        }
    }

    private static void registerFileSystem(final FileSystem fileSystem, final String str) {
        FileSystem.initialize(new Configuration(), new TestingPluginManager(Collections.singletonMap(FileSystemFactory.class, Collections.singleton(new FileSystemFactory() { // from class: org.apache.flink.test.state.ChangelogRecoveryCachingITCase.2
            public FileSystem create(URI uri) {
                return fileSystem;
            }

            public String getScheme() {
                return str;
            }
        }).iterator())));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1012430706:
                if (implMethodName.equals("lambda$createJobGraph$b29523fd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/test/state/ChangelogRecoveryCachingITCase") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Long;)Ljava/lang/Long;")) {
                    return l -> {
                        return Long.valueOf(l.longValue() % 1000);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
