/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.query;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateEntry;
import org.apache.flink.runtime.query.KvStateInfo;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateRegistryListener;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.junit.jupiter.api.Test;

class KvStateRegistryTest {
    KvStateRegistryTest() {
    }

    @Test
    void testKvStateEntry() throws InterruptedException {
        int threads = 10;
        CountDownLatch latch1 = new CountDownLatch(10);
        CountDownLatch latch2 = new CountDownLatch(1);
        List<KvStateInfo> infos = Collections.synchronizedList(new ArrayList());
        JobID jobID = new JobID();
        JobVertexID jobVertexId = new JobVertexID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        String registrationName = "foobar";
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        KvStateID stateID = kvStateRegistry.registerKvState(jobID, jobVertexId, keyGroupRange, "foobar", (InternalKvState)new DummyKvState(), this.getClass().getClassLoader());
        AtomicReference exceptionHolder = new AtomicReference();
        for (int i = 0; i < 10; ++i) {
            new Thread(() -> {
                KvStateEntry kvState = kvStateRegistry.getKvState(stateID);
                KvStateInfo stateInfo = kvState.getInfoForCurrentThread();
                infos.add(stateInfo);
                latch1.countDown();
                try {
                    latch2.await();
                }
                catch (InterruptedException e) {
                    exceptionHolder.compareAndSet(null, e);
                }
            }).start();
        }
        latch1.await();
        KvStateEntry kvState = kvStateRegistry.getKvState(stateID);
        Assertions.assertThat(infos).hasSize(10);
        Assertions.assertThat((int)kvState.getCacheSize()).isEqualTo(10);
        latch2.countDown();
        for (KvStateInfo infoA : infos) {
            boolean instanceAlreadyFound = false;
            for (KvStateInfo infoB : infos) {
                if (infoA == infoB) {
                    if (instanceAlreadyFound) {
                        Fail.fail((String)"More than one thread sharing the same serializer instance.");
                    }
                    instanceAlreadyFound = true;
                    continue;
                }
                Assertions.assertThat((Object)infoA).isEqualTo((Object)infoB);
            }
        }
        kvStateRegistry.unregisterKvState(jobID, jobVertexId, keyGroupRange, "foobar", stateID);
        Assertions.assertThat((int)kvState.getCacheSize()).isZero();
        Throwable t = (Throwable)exceptionHolder.get();
        Assertions.assertThat((Throwable)t).isNull();
    }

    @Test
    void testKvStateRegistryListenerNotification() {
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        ArrayDeque registeredNotifications1 = new ArrayDeque(2);
        ArrayDeque deregisteredNotifications1 = new ArrayDeque(2);
        TestingKvStateRegistryListener listener1 = new TestingKvStateRegistryListener(registeredNotifications1, deregisteredNotifications1);
        ArrayDeque registeredNotifications2 = new ArrayDeque(2);
        ArrayDeque deregisteredNotifications2 = new ArrayDeque(2);
        TestingKvStateRegistryListener listener2 = new TestingKvStateRegistryListener(registeredNotifications2, deregisteredNotifications2);
        kvStateRegistry.registerListener(jobId1, (KvStateRegistryListener)listener1);
        kvStateRegistry.registerListener(jobId2, (KvStateRegistryListener)listener2);
        JobVertexID jobVertexId = new JobVertexID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        String registrationName = "foobar";
        KvStateID kvStateID = kvStateRegistry.registerKvState(jobId1, jobVertexId, keyGroupRange, "foobar", (InternalKvState)new DummyKvState(), this.getClass().getClassLoader());
        Assertions.assertThat((Comparable)((Comparable)registeredNotifications1.poll())).isEqualTo((Object)jobId1);
        Assertions.assertThat(registeredNotifications2).isEmpty();
        JobVertexID jobVertexId2 = new JobVertexID();
        KeyGroupRange keyGroupRange2 = new KeyGroupRange(0, 1);
        String registrationName2 = "barfoo";
        KvStateID kvStateID2 = kvStateRegistry.registerKvState(jobId2, jobVertexId2, keyGroupRange2, "barfoo", (InternalKvState)new DummyKvState(), this.getClass().getClassLoader());
        Assertions.assertThat((Comparable)((Comparable)registeredNotifications2.poll())).isEqualTo((Object)jobId2);
        Assertions.assertThat(registeredNotifications1).isEmpty();
        kvStateRegistry.unregisterKvState(jobId1, jobVertexId, keyGroupRange, "foobar", kvStateID);
        Assertions.assertThat((Comparable)((Comparable)deregisteredNotifications1.poll())).isEqualTo((Object)jobId1);
        Assertions.assertThat(deregisteredNotifications2).isEmpty();
        kvStateRegistry.unregisterKvState(jobId2, jobVertexId2, keyGroupRange2, "barfoo", kvStateID2);
        Assertions.assertThat((Comparable)((Comparable)deregisteredNotifications2.poll())).isEqualTo((Object)jobId2);
        Assertions.assertThat(deregisteredNotifications1).isEmpty();
    }

    @Test
    void testLegacyCodePathPreference() {
        KvStateRegistry kvStateRegistry = new KvStateRegistry();
        ArrayDeque stateRegistrationNotifications = new ArrayDeque(2);
        ArrayDeque stateDeregistrationNotifications = new ArrayDeque(2);
        TestingKvStateRegistryListener testingListener = new TestingKvStateRegistryListener(stateRegistrationNotifications, stateDeregistrationNotifications);
        ArrayDeque anotherQueue = new ArrayDeque(2);
        TestingKvStateRegistryListener anotherListener = new TestingKvStateRegistryListener(anotherQueue, anotherQueue);
        JobID jobId = new JobID();
        kvStateRegistry.registerListener(HighAvailabilityServices.DEFAULT_JOB_ID, (KvStateRegistryListener)testingListener);
        kvStateRegistry.registerListener(jobId, (KvStateRegistryListener)anotherListener);
        JobVertexID jobVertexId = new JobVertexID();
        KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
        String registrationName = "registrationName";
        KvStateID kvStateID = kvStateRegistry.registerKvState(jobId, jobVertexId, keyGroupRange, "registrationName", (InternalKvState)new DummyKvState(), this.getClass().getClassLoader());
        Assertions.assertThat((Comparable)((Comparable)stateRegistrationNotifications.poll())).isEqualTo((Object)jobId);
        Assertions.assertThat(anotherQueue).isEmpty();
        kvStateRegistry.unregisterKvState(jobId, jobVertexId, keyGroupRange, "registrationName", kvStateID);
        Assertions.assertThat((Comparable)((Comparable)stateDeregistrationNotifications.poll())).isEqualTo((Object)jobId);
        Assertions.assertThat(anotherQueue).isEmpty();
    }

    private static class DeepCopyingStringSerializer
    extends TypeSerializer<String> {
        private static final long serialVersionUID = -3744051158625555607L;

        private DeepCopyingStringSerializer() {
        }

        public boolean isImmutableType() {
            return false;
        }

        public TypeSerializer<String> duplicate() {
            return new DeepCopyingStringSerializer();
        }

        public String createInstance() {
            return null;
        }

        public String copy(String from) {
            return null;
        }

        public String copy(String from, String reuse) {
            return null;
        }

        public int getLength() {
            return 0;
        }

        public void serialize(String record, DataOutputView target) throws IOException {
        }

        public String deserialize(DataInputView source) throws IOException {
            return null;
        }

        public String deserialize(String reuse, DataInputView source) throws IOException {
            return null;
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof DeepCopyingStringSerializer;
        }

        public int hashCode() {
            return 0;
        }

        public TypeSerializerSnapshot<String> snapshotConfiguration() {
            return null;
        }
    }

    private static class DummyKvState
    implements InternalKvState<Integer, VoidNamespace, String> {
        private DummyKvState() {
        }

        public TypeSerializer<Integer> getKeySerializer() {
            return IntSerializer.INSTANCE;
        }

        public TypeSerializer<VoidNamespace> getNamespaceSerializer() {
            return VoidNamespaceSerializer.INSTANCE;
        }

        public TypeSerializer<String> getValueSerializer() {
            return new DeepCopyingStringSerializer();
        }

        public void setCurrentNamespace(VoidNamespace namespace) {
        }

        public byte[] getSerializedValue(byte[] serializedKeyAndNamespace, TypeSerializer<Integer> safeKeySerializer, TypeSerializer<VoidNamespace> safeNamespaceSerializer, TypeSerializer<String> safeValueSerializer) throws Exception {
            return serializedKeyAndNamespace;
        }

        public InternalKvState.StateIncrementalVisitor<Integer, VoidNamespace, String> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
            throw new UnsupportedOperationException();
        }

        public void clear() {
        }
    }

    private static final class TestingKvStateRegistryListener
    implements KvStateRegistryListener {
        private final Queue<JobID> stateRegisteredNotifications;
        private final Queue<JobID> stateDeregisteredNotifications;

        private TestingKvStateRegistryListener(Queue<JobID> stateRegisteredNotifications, Queue<JobID> stateDeregisteredNotifications) {
            this.stateRegisteredNotifications = stateRegisteredNotifications;
            this.stateDeregisteredNotifications = stateDeregisteredNotifications;
        }

        public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) {
            this.stateRegisteredNotifications.offer(jobId);
        }

        public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) {
            this.stateDeregisteredNotifications.offer(jobId);
        }
    }
}

