/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobMaster;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterTestUtils;
import org.apache.flink.runtime.jobmaster.KvStateRegistryGateway;
import org.apache.flink.runtime.jobmaster.utils.JobMasterBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

class JobMasterQueryableStateTest {
    private static final Duration testingTimeout = Duration.ofSeconds(10L);
    private static TestingRpcService rpcService;
    private static final int PARALLELISM = 4;
    private static final JobVertex JOB_VERTEX_1;
    private static final JobVertex JOB_VERTEX_2;
    private static final JobGraph JOB_GRAPH;

    JobMasterQueryableStateTest() {
    }

    @BeforeAll
    private static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @AfterEach
    private void teardown() throws Exception {
        rpcService.clearGateways();
    }

    @AfterAll
    private static void teardownClass() {
        if (rpcService != null) {
            rpcService.closeAsync();
            rpcService = null;
        }
    }

    @Test
    void testRequestKvStateWithoutRegistration() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "unknown").get()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(UnknownKvStateLocation.class)});
        }
    }

    @Test
    void testRequestKvStateOfWrongJob() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> jobMasterGateway.requestKvStateLocation(new JobID(), "unknown").get()).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class)});
        }
    }

    @Test
    void testRequestKvStateWithIrrelevantRegistration() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
            Assertions.assertThatThrownBy(() -> JobMasterQueryableStateTest.registerKvState((KvStateRegistryGateway)jobMasterGateway, new JobID(), new JobVertexID(), "any-name")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkJobNotFoundException.class)});
        }
    }

    @Test
    void testRegisterKvState() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
            String registrationName = "register-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            jobMasterGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, address).get();
            KvStateLocation location = (KvStateLocation)jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get();
            Assertions.assertThat((Comparable)location.getJobId()).isEqualTo((Object)JOB_GRAPH.getJobID());
            Assertions.assertThat((Comparable)location.getJobVertexId()).isEqualTo((Object)JOB_VERTEX_1.getID());
            Assertions.assertThat((int)location.getNumKeyGroups()).isEqualTo(JOB_VERTEX_1.getMaxParallelism());
            Assertions.assertThat((int)location.getNumRegisteredKeyGroups()).isOne();
            Assertions.assertThat((int)keyGroupRange.getNumberOfKeyGroups()).isOne();
            Assertions.assertThat((Comparable)location.getKvStateID(keyGroupRange.getStartKeyGroup())).isEqualTo((Object)kvStateID);
            Assertions.assertThat((Object)location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup())).isEqualTo((Object)address);
        }
    }

    @Test
    void testUnregisterKvState() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
            String registrationName = "register-me";
            KvStateID kvStateID = new KvStateID();
            KeyGroupRange keyGroupRange = new KeyGroupRange(0, 0);
            InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), 1029);
            jobMasterGateway.notifyKvStateRegistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me", kvStateID, address).get();
            jobMasterGateway.notifyKvStateUnregistered(JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), keyGroupRange, "register-me").get();
            ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> jobMasterGateway.requestKvStateLocation(JOB_GRAPH.getJobID(), "register-me").get()).as("Expected to fail with an UnknownKvStateLocation.", new Object[0])).isInstanceOf(Exception.class)).hasCauseInstanceOf(UnknownKvStateLocation.class);
        }
    }

    @Test
    void testDuplicatedKvStateRegistrationsFailTask() throws Exception {
        try (JobMaster jobMaster = new JobMasterBuilder(JOB_GRAPH, rpcService).createJobMaster();){
            jobMaster.start();
            JobMasterGateway jobMasterGateway = (JobMasterGateway)jobMaster.getSelfGateway(JobMasterGateway.class);
            JobMasterQueryableStateTest.registerSlotsRequiredForJobExecution(jobMasterGateway, JOB_GRAPH.getJobID());
            String registrationName = "duplicate-me";
            JobMasterQueryableStateTest.registerKvState((KvStateRegistryGateway)jobMasterGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_1.getID(), "duplicate-me");
            ((AbstractThrowableAssert)((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> JobMasterQueryableStateTest.registerKvState((KvStateRegistryGateway)jobMasterGateway, JOB_GRAPH.getJobID(), JOB_VERTEX_2.getID(), "duplicate-me")).as("Expected to fail because of clashing registration message.", new Object[0])).isInstanceOf(Exception.class)).hasMessageContaining("Registration name clash");
            Assertions.assertThat((Comparable)((JobStatus)jobMasterGateway.requestJobStatus(testingTimeout).get())).satisfies(new Consumer[]{jobStatus -> {
                assert (jobStatus == JobStatus.FAILED || jobStatus == JobStatus.FAILING);
            }});
        }
    }

    private static void registerSlotsRequiredForJobExecution(JobMasterGateway jobMasterGateway, JobID jobId) throws ExecutionException, InterruptedException {
        OneShotLatch oneTaskSubmittedLatch = new OneShotLatch();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            oneTaskSubmittedLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).createTestingTaskExecutorGateway();
        JobMasterTestUtils.registerTaskExecutorAndOfferSlots(rpcService, jobMasterGateway, jobId, 4, taskExecutorGateway, testingTimeout);
        oneTaskSubmittedLatch.await();
    }

    private static void registerKvState(KvStateRegistryGateway stateRegistryGateway, JobID jobId, JobVertexID jobVertexId, String registrationName) throws UnknownHostException, ExecutionException, InterruptedException {
        stateRegistryGateway.notifyKvStateRegistered(jobId, jobVertexId, new KeyGroupRange(0, 0), registrationName, new KvStateID(), new InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
    }

    static {
        JOB_VERTEX_1 = new JobVertex("v1");
        JOB_VERTEX_1.setParallelism(4);
        JOB_VERTEX_1.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_2 = new JobVertex("v2");
        JOB_VERTEX_2.setParallelism(4);
        JOB_VERTEX_2.setInvokableClass(AbstractInvokable.class);
        JOB_VERTEX_1.setMaxParallelism(16);
        JOB_VERTEX_2.setMaxParallelism(16);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JOB_VERTEX_1.setSlotSharingGroup(slotSharingGroup);
        JOB_VERTEX_2.setSlotSharingGroup(slotSharingGroup);
        JOB_GRAPH = JobGraphTestUtils.streamingJobGraph(JOB_VERTEX_1, JOB_VERTEX_2);
        JOB_GRAPH.setJobType(JobType.STREAMING);
    }
}

