package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.zookeeper.org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.class */
public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpointStoreTest {
    private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1);
    private static final String CHECKPOINT_PATH = "/checkpoints";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase$HeapRetrievableStateHandle.class */
    public static class HeapRetrievableStateHandle<T extends Serializable> implements RetrievableStateHandle<T> {
        private static final long serialVersionUID = -268548467968932L;
        private static AtomicInteger nextKey = new AtomicInteger(0);
        private static HashMap<Integer, Object> stateMap = new HashMap<>();
        private final int key = nextKey.getAndIncrement();

        public HeapRetrievableStateHandle(T t) {
            stateMap.put(Integer.valueOf(this.key), t);
        }

        public T retrieveState() {
            return (T) stateMap.get(Integer.valueOf(this.key));
        }

        public void discardState() throws Exception {
            stateMap.remove(Integer.valueOf(this.key));
        }

        public long getStateSize() {
            return 0L;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase$HeapStateStorageHelper.class */
    static class HeapStateStorageHelper implements RetrievableStateStorageHelper<CompletedCheckpoint> {
        HeapStateStorageHelper() {
        }

        public RetrievableStateHandle<CompletedCheckpoint> store(CompletedCheckpoint completedCheckpoint) throws Exception {
            return new HeapRetrievableStateHandle(completedCheckpoint);
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZOOKEEPER != null) {
            ZOOKEEPER.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZOOKEEPER.deleteAll();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest
    /* renamed from: createCompletedCheckpoints, reason: merged with bridge method [inline-methods] */
    public ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints(int i) throws Exception {
        return new ZooKeeperCompletedCheckpointStore(i, ZooKeeperUtils.createZooKeeperStateHandleStore(ZOOKEEPER.getClient(), CHECKPOINT_PATH, new TestingRetrievableStateStorageHelper()), Executors.directExecutor());
    }

    @Test
    public void testRecover() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints = mo19createCompletedCheckpoints(3);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint[] testCompletedCheckpointArr = {createCheckpoint(0L, sharedStateRegistry), createCheckpoint(1L, sharedStateRegistry), createCheckpoint(2L, sharedStateRegistry)};
        mo19createCompletedCheckpoints.addCheckpoint(testCompletedCheckpointArr[0]);
        mo19createCompletedCheckpoints.addCheckpoint(testCompletedCheckpointArr[1]);
        mo19createCompletedCheckpoints.addCheckpoint(testCompletedCheckpointArr[2]);
        verifyCheckpointRegistered(testCompletedCheckpointArr[0].getOperatorStates().values(), sharedStateRegistry);
        verifyCheckpointRegistered(testCompletedCheckpointArr[1].getOperatorStates().values(), sharedStateRegistry);
        verifyCheckpointRegistered(testCompletedCheckpointArr[2].getOperatorStates().values(), sharedStateRegistry);
        Assert.assertEquals(3L, ((List) ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals(3L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        sharedStateRegistry.close();
        SharedStateRegistry sharedStateRegistry2 = new SharedStateRegistry();
        mo19createCompletedCheckpoints.recover();
        Assert.assertEquals(3L, ((List) ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH)).size());
        Assert.assertEquals(3L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertEquals(testCompletedCheckpointArr[2], mo19createCompletedCheckpoints.getLatestCheckpoint(false));
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(testCompletedCheckpointArr[1]);
        arrayList.add(testCompletedCheckpointArr[2]);
        arrayList.add(createCheckpoint(3L, sharedStateRegistry2));
        mo19createCompletedCheckpoints.addCheckpoint((CompletedCheckpoint) arrayList.get(2));
        List allCheckpoints = mo19createCompletedCheckpoints.getAllCheckpoints();
        Assert.assertEquals(arrayList, allCheckpoints);
        Iterator it = allCheckpoints.iterator();
        while (it.hasNext()) {
            verifyCheckpointRegistered(((CompletedCheckpoint) it.next()).getOperatorStates().values(), sharedStateRegistry2);
        }
    }

    @Test
    public void testShutdownDiscardsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints = mo19createCompletedCheckpoints(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(0L, sharedStateRegistry);
        mo19createCompletedCheckpoints.addCheckpoint(createCheckpoint);
        Assert.assertEquals(1L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(createCheckpoint.getCheckpointID())));
        mo19createCompletedCheckpoints.shutdown(JobStatus.FINISHED);
        Assert.assertEquals(0L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(createCheckpoint.getCheckpointID())));
        sharedStateRegistry.close();
        mo19createCompletedCheckpoints.recover();
        Assert.assertEquals(0L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSuspendKeepsCheckpoints() throws Exception {
        CuratorFramework client = ZOOKEEPER.getClient();
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints = mo19createCompletedCheckpoints(1);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(0L, sharedStateRegistry);
        mo19createCompletedCheckpoints.addCheckpoint(createCheckpoint);
        Assert.assertEquals(1L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(createCheckpoint.getCheckpointID())));
        mo19createCompletedCheckpoints.shutdown(JobStatus.SUSPENDED);
        Assert.assertEquals(0L, mo19createCompletedCheckpoints.getNumberOfRetainedCheckpoints());
        Assert.assertNotNull("The checkpoint node should exist.", (Stat) client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(createCheckpoint.getCheckpointID())));
        Assert.assertEquals("The checkpoint node should not be locked.", 0L, r0.getNumChildren());
        sharedStateRegistry.close();
        mo19createCompletedCheckpoints.recover();
        Assert.assertEquals(createCheckpoint, mo19createCompletedCheckpoints.getLatestCheckpoint(false));
    }

    @Test
    public void testLatestCheckpointRecovery() throws Exception {
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints = mo19createCompletedCheckpoints(3);
        ArrayList arrayList = new ArrayList(3);
        arrayList.add(createCheckpoint(9L, sharedStateRegistry));
        arrayList.add(createCheckpoint(10L, sharedStateRegistry));
        arrayList.add(createCheckpoint(11L, sharedStateRegistry));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            mo19createCompletedCheckpoints.addCheckpoint((CompletedCheckpoint) it.next());
        }
        sharedStateRegistry.close();
        mo19createCompletedCheckpoints.recover();
        Assert.assertEquals(arrayList.get(arrayList.size() - 1), mo19createCompletedCheckpoints.getLatestCheckpoint(false));
    }

    @Test
    public void testConcurrentCheckpointOperations() throws Exception {
        ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints = mo19createCompletedCheckpoints(1);
        ZooKeeperCompletedCheckpointStore mo19createCompletedCheckpoints2 = mo19createCompletedCheckpoints(1);
        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
        mo19createCompletedCheckpoints.addCheckpoint(createCheckpoint(1L, sharedStateRegistry));
        sharedStateRegistry.close();
        SharedStateRegistry sharedStateRegistry2 = new SharedStateRegistry();
        mo19createCompletedCheckpoints2.recover();
        CompletedCheckpoint latestCheckpoint = mo19createCompletedCheckpoints2.getLatestCheckpoint(false);
        Assert.assertTrue(latestCheckpoint instanceof CompletedCheckpointStoreTest.TestCompletedCheckpoint);
        CompletedCheckpointStoreTest.TestCompletedCheckpoint testCompletedCheckpoint = (CompletedCheckpointStoreTest.TestCompletedCheckpoint) latestCheckpoint;
        Assert.assertFalse(testCompletedCheckpoint.isDiscarded());
        CompletedCheckpointStoreTest.TestCompletedCheckpoint createCheckpoint = createCheckpoint(2L, sharedStateRegistry2);
        mo19createCompletedCheckpoints.addCheckpoint(createCheckpoint);
        Assert.assertEquals(Collections.singletonList(createCheckpoint), mo19createCompletedCheckpoints.getAllCheckpoints());
        Assert.assertFalse("The checkpoint should not have been discarded.", testCompletedCheckpoint.awaitDiscard(50L));
        Assert.assertFalse(testCompletedCheckpoint.isDiscarded());
        mo19createCompletedCheckpoints2.addCheckpoint(createCheckpoint(3L, sharedStateRegistry2));
        testCompletedCheckpoint.awaitDiscard();
    }
}
