/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.checkpoint.TestingRetrievableStateStorageHelper;
import org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.TriConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ZooKeeperCompletedCheckpointStoreTest
extends TestLogger {
    @ClassRule
    public static ZooKeeperResource zooKeeperResource = new ZooKeeperResource();

    @Test
    public void testPathConversion() {
        long checkpointId = 42L;
        String path = ZooKeeperCompletedCheckpointStore.checkpointIdToPath((long)42L);
        Assert.assertEquals((long)42L, (long)ZooKeeperCompletedCheckpointStore.pathToCheckpointId((String)path));
    }

    @Test(expected=ExpectedTestException.class)
    public void testRecoverFailsIfDownloadFails() throws Exception {
        this.testDownloadInternal((TriConsumer<ZooKeeperCompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry>)((TriConsumer)(store, checkpointsInZk, sharedStateRegistry) -> {
            try {
                checkpointsInZk.add(this.createHandle(1L, id -> {
                    throw new ExpectedTestException();
                }));
                store.recover();
            }
            catch (Exception exception) {
                ExceptionUtils.findThrowable((Throwable)exception, ExpectedTestException.class).ifPresent(ExceptionUtils::rethrow);
                ExceptionUtils.rethrow((Throwable)exception);
            }
        }));
    }

    @Test
    public void testNoDownloadIfCheckpointsNotChanged() throws Exception {
        this.testDownloadInternal((TriConsumer<ZooKeeperCompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry>)((TriConsumer)(store, checkpointsInZk, sharedStateRegistry) -> {
            try {
                checkpointsInZk.add(this.createHandle(1L, id -> {
                    throw new AssertionError((Object)("retrieveState was attempted for checkpoint " + id));
                }));
                store.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry));
                store.recover();
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }));
    }

    @Test
    public void testDownloadIfCheckpointsChanged() throws Exception {
        this.testDownloadInternal((TriConsumer<ZooKeeperCompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry>)((TriConsumer)(store, checkpointsInZk, sharedStateRegistry) -> {
            try {
                int lastInZk = 10;
                IntStream.range(0, lastInZk + 1).forEach(i -> checkpointsInZk.add(this.createHandle(i, id -> CompletedCheckpointStoreTest.createCheckpoint(id, sharedStateRegistry))));
                store.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry));
                store.addCheckpoint((CompletedCheckpoint)CompletedCheckpointStoreTest.createCheckpoint(5L, sharedStateRegistry));
                store.recover();
                Assert.assertEquals((long)lastInZk, (long)store.getLatestCheckpoint(false).getCheckpointID());
            }
            catch (Exception exception) {
                throw new RuntimeException(exception);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDownloadInternal(TriConsumer<ZooKeeperCompletedCheckpointStore, List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>, SharedStateRegistry> test) throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        final ArrayList checkpointsInZk = new ArrayList();
        ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper = new ZooKeeperStateHandleStore<CompletedCheckpoint>(ZooKeeperUtils.startCuratorFramework((Configuration)configuration), new TestingRetrievableStateStorageHelper()){

            public List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> getAllAndLock() {
                return checkpointsInZk;
            }
        };
        ZooKeeperCompletedCheckpointStore store = new ZooKeeperCompletedCheckpointStore(10, (ZooKeeperStateHandleStore)checkpointsInZooKeeper, Executors.directExecutor());
        try {
            test.accept((Object)store, checkpointsInZk, (Object)sharedStateRegistry);
        }
        finally {
            store.shutdown(JobStatus.FINISHED);
            sharedStateRegistry.close();
        }
    }

    private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> createHandle(long id, Function<Long, CompletedCheckpoint> checkpointSupplier) {
        return Tuple2.of((Object)new CheckpointStateHandle(checkpointSupplier, id), (Object)ZooKeeperCompletedCheckpointStore.checkpointIdToPath((long)id));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscardingSubsumedCheckpoints() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        ZooKeeperCompletedCheckpointStore checkpointStore = this.createZooKeeperCheckpointStore(client);
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0L, sharedStateRegistry);
            checkpointStore.addCheckpoint((CompletedCheckpoint)checkpoint1);
            Assert.assertThat((Object)checkpointStore.getAllCheckpoints(), (Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint1}));
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint2 = CompletedCheckpointStoreTest.createCheckpoint(1L, sharedStateRegistry);
            checkpointStore.addCheckpoint((CompletedCheckpoint)checkpoint2);
            List allCheckpoints = checkpointStore.getAllCheckpoints();
            Assert.assertThat((Object)allCheckpoints, (Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint2}));
            Assert.assertThat((Object)allCheckpoints, (Matcher)Matchers.not((Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint1})));
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
        }
        finally {
            client.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiscardingCheckpointsAtShutDown() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperResource.getConnectString());
        CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
        ZooKeeperCompletedCheckpointStore checkpointStore = this.createZooKeeperCheckpointStore(client);
        try {
            CompletedCheckpointStoreTest.TestCompletedCheckpoint checkpoint1 = CompletedCheckpointStoreTest.createCheckpoint(0L, sharedStateRegistry);
            checkpointStore.addCheckpoint((CompletedCheckpoint)checkpoint1);
            Assert.assertThat((Object)checkpointStore.getAllCheckpoints(), (Matcher)Matchers.contains((Object[])new CompletedCheckpoint[]{checkpoint1}));
            checkpointStore.shutdown(JobStatus.FINISHED);
            CompletedCheckpointStoreTest.verifyCheckpointDiscarded(checkpoint1);
        }
        finally {
            client.close();
        }
    }

    @Nonnull
    private ZooKeeperCompletedCheckpointStore createZooKeeperCheckpointStore(CuratorFramework client) throws Exception {
        ZooKeeperStateHandleStore checkpointsInZooKeeper = ZooKeeperUtils.createZooKeeperStateHandleStore((CuratorFramework)client, (String)"/checkpoints", new TestingRetrievableStateStorageHelper());
        return new ZooKeeperCompletedCheckpointStore(1, checkpointsInZooKeeper, Executors.directExecutor());
    }

    private static class CheckpointStateHandle
    implements RetrievableStateHandle<CompletedCheckpoint> {
        private static final long serialVersionUID = 1L;
        private final Function<Long, CompletedCheckpoint> checkpointSupplier;
        private final long id;

        CheckpointStateHandle(Function<Long, CompletedCheckpoint> checkpointSupplier, long id) {
            this.checkpointSupplier = checkpointSupplier;
            this.id = id;
        }

        public CompletedCheckpoint retrieveState() {
            return this.checkpointSupplier.apply(this.id);
        }

        public void discardState() {
        }

        public long getStateSize() {
            return 0L;
        }
    }
}

