package org.apache.flink.runtime.rest.handler.job.checkpoints;

import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.runtime.dispatcher.UnknownOperationKeyException;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.RestMatchers;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.HandlerRequestException;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
import org.apache.flink.runtime.rest.handler.async.OperationResult;
import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointHandlers;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.TriggerId;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointInfo;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.concurrent.FutureUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointHandlersTest.class */
class CheckpointHandlersTest {
    private static final Time TIMEOUT = Time.seconds(10);
    private static final JobID JOB_ID = new JobID();
    private static final Long COMPLETED_CHECKPOINT_ID = 123456L;
    private static CheckpointHandlers.CheckpointTriggerHandler checkpointTriggerHandler;
    private static CheckpointHandlers.CheckpointStatusHandler checkpointStatusHandler;

    CheckpointHandlersTest() {
    }

    @BeforeAll
    static void setUp() throws Exception {
        GatewayRetriever gatewayRetriever = () -> {
            return CompletableFuture.completedFuture(null);
        };
        checkpointTriggerHandler = new CheckpointHandlers.CheckpointTriggerHandler(gatewayRetriever, TIMEOUT, Collections.emptyMap());
        checkpointStatusHandler = new CheckpointHandlers.CheckpointStatusHandler(gatewayRetriever, TIMEOUT, Collections.emptyMap());
    }

    @Test
    void testCheckpointTriggerCompletedSuccessfully() throws Exception {
        OperationResult success = OperationResult.success(COMPLETED_CHECKPOINT_ID);
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((asynchronousJobOperationKey, checkpointType) -> {
            atomicReference.set(asynchronousJobOperationKey);
            completableFuture.complete(checkpointType);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setGetCheckpointStatusFunction(asynchronousJobOperationKey2 -> {
            if (asynchronousJobOperationKey2.equals(atomicReference.get())) {
                return CompletableFuture.completedFuture(success);
            }
            throw new RuntimeException("Expected operation key " + atomicReference.get() + ", but received " + asynchronousJobOperationKey2);
        }).build();
        AsynchronousOperationResult asynchronousOperationResult = (AsynchronousOperationResult) checkpointStatusHandler.handleRequest(checkpointTriggerStatusRequest(((TriggerResponse) checkpointTriggerHandler.handleRequest(triggerCheckpointRequest(CheckpointType.FULL, null), build).get()).getTriggerId()), build).get();
        Assertions.assertThat(asynchronousOperationResult.queueStatus().getId()).isEqualTo(QueueStatus.Id.COMPLETED);
        Assertions.assertThat(asynchronousOperationResult.resource()).isNotNull();
        Assertions.assertThat(((CheckpointInfo) asynchronousOperationResult.resource()).getCheckpointId()).isEqualTo(COMPLETED_CHECKPOINT_ID);
        Assertions.assertThat((Comparable) completableFuture.get()).isEqualTo(CheckpointType.FULL);
    }

    @Test
    void testTriggerCheckpointNoCheckpointType() throws Exception {
        OperationResult success = OperationResult.success(COMPLETED_CHECKPOINT_ID);
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((asynchronousJobOperationKey, checkpointType) -> {
            atomicReference.set(asynchronousJobOperationKey);
            completableFuture.complete(checkpointType);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setGetCheckpointStatusFunction(asynchronousJobOperationKey2 -> {
            if (asynchronousJobOperationKey2.equals(atomicReference.get())) {
                return CompletableFuture.completedFuture(success);
            }
            throw new RuntimeException("Expected operation key " + atomicReference.get() + ", but received " + asynchronousJobOperationKey2);
        }).build();
        AsynchronousOperationResult asynchronousOperationResult = (AsynchronousOperationResult) checkpointStatusHandler.handleRequest(checkpointTriggerStatusRequest(((TriggerResponse) checkpointTriggerHandler.handleRequest(triggerCheckpointRequest(null, null), build).get()).getTriggerId()), build).get();
        Assertions.assertThat(asynchronousOperationResult.queueStatus().getId()).isEqualTo(QueueStatus.Id.COMPLETED);
        Assertions.assertThat(asynchronousOperationResult.resource()).isNotNull();
        Assertions.assertThat(((CheckpointInfo) asynchronousOperationResult.resource()).getCheckpointId()).isEqualTo(COMPLETED_CHECKPOINT_ID);
        Assertions.assertThat((Comparable) completableFuture.get()).isEqualTo(CheckpointType.DEFAULT);
    }

    @Test
    void testDisallowTriggeringIncrementalCheckpoint() throws Exception {
        OperationResult success = OperationResult.success(COMPLETED_CHECKPOINT_ID);
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((asynchronousJobOperationKey, checkpointType) -> {
            atomicReference.set(asynchronousJobOperationKey);
            completableFuture.complete(checkpointType);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setGetCheckpointStatusFunction(asynchronousJobOperationKey2 -> {
            if (asynchronousJobOperationKey2.equals(atomicReference.get())) {
                return CompletableFuture.completedFuture(success);
            }
            throw new RuntimeException("Expected operation key " + atomicReference.get() + ", but received " + asynchronousJobOperationKey2);
        }).build();
        CheckpointType checkpointType2 = CheckpointType.INCREMENTAL;
        org.junit.jupiter.api.Assertions.assertThrows(IllegalStateException.class, () -> {
        });
        Assertions.assertThat(completableFuture.isDone()).isFalse();
    }

    @Test
    void testCheckpointCompletedWithException() throws Exception {
        OperationResult failure = OperationResult.failure(new RuntimeException("expected"));
        AtomicReference atomicReference = new AtomicReference();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((asynchronousJobOperationKey, checkpointType) -> {
            atomicReference.set(asynchronousJobOperationKey);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setGetCheckpointStatusFunction(asynchronousJobOperationKey2 -> {
            if (asynchronousJobOperationKey2.equals(atomicReference.get())) {
                return CompletableFuture.completedFuture(failure);
            }
            throw new RuntimeException("Expected operation key " + atomicReference.get() + ", but received " + asynchronousJobOperationKey2);
        }).build();
        AsynchronousOperationResult asynchronousOperationResult = (AsynchronousOperationResult) checkpointStatusHandler.handleRequest(checkpointTriggerStatusRequest(((TriggerResponse) checkpointTriggerHandler.handleRequest(triggerCheckpointRequest(null, null), build).get()).getTriggerId()), build).get();
        Assertions.assertThat(asynchronousOperationResult.queueStatus().getId()).isEqualTo(QueueStatus.Id.COMPLETED);
        Assertions.assertThat(asynchronousOperationResult.resource()).isNotNull();
        Assertions.assertThat(((CheckpointInfo) asynchronousOperationResult.resource()).getFailureCause()).isNotNull();
        Throwable deserializeError = ((CheckpointInfo) asynchronousOperationResult.resource()).getFailureCause().deserializeError(ClassLoader.getSystemClassLoader());
        Assertions.assertThat(deserializeError.getMessage()).matches("expected");
        Assertions.assertThat(deserializeError).isInstanceOf(RuntimeException.class);
    }

    @Test
    void testProvidedTriggerId() throws Exception {
        OperationResult success = OperationResult.success(COMPLETED_CHECKPOINT_ID);
        AtomicReference atomicReference = new AtomicReference();
        TestingRestfulGateway build = new TestingRestfulGateway.Builder().setTriggerCheckpointFunction((asynchronousJobOperationKey, checkpointType) -> {
            atomicReference.set(asynchronousJobOperationKey);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).setGetCheckpointStatusFunction(asynchronousJobOperationKey2 -> {
            if (asynchronousJobOperationKey2.equals(atomicReference.get())) {
                return CompletableFuture.completedFuture(success);
            }
            throw new RuntimeException("Expected operation key " + atomicReference.get() + ", but received " + asynchronousJobOperationKey2);
        }).build();
        TriggerId triggerId = new TriggerId();
        Assertions.assertThat(triggerId).isEqualTo(((TriggerResponse) checkpointTriggerHandler.handleRequest(triggerCheckpointRequest(CheckpointType.FULL, triggerId), build).get()).getTriggerId());
        AsynchronousOperationResult asynchronousOperationResult = (AsynchronousOperationResult) checkpointStatusHandler.handleRequest(checkpointTriggerStatusRequest(triggerId), build).get();
        Assertions.assertThat(asynchronousOperationResult.queueStatus().getId()).isEqualTo(QueueStatus.Id.COMPLETED);
        Assertions.assertThat(asynchronousOperationResult.resource()).isNotNull();
        Assertions.assertThat(((CheckpointInfo) asynchronousOperationResult.resource()).getCheckpointId()).isEqualTo(COMPLETED_CHECKPOINT_ID);
    }

    @Test
    void testQueryStatusOfUnknownOperationReturnsError() throws HandlerRequestException, RestHandlerException {
        CompletableFutureAssert assertThat = Assertions.assertThat(checkpointStatusHandler.handleRequest(checkpointTriggerStatusRequest(new TriggerId()), new TestingRestfulGateway.Builder().setGetCheckpointStatusFunction(asynchronousJobOperationKey -> {
            return FutureUtils.completedExceptionally(new UnknownOperationKeyException(asynchronousJobOperationKey));
        }).build()));
        Matcher respondsWithError = RestMatchers.respondsWithError(HttpResponseStatus.NOT_FOUND);
        respondsWithError.getClass();
        assertThat.matches((v1) -> {
            return r1.matches(v1);
        });
    }

    private static HandlerRequest<CheckpointTriggerRequestBody> triggerCheckpointRequest(CheckpointType checkpointType, @Nullable TriggerId triggerId) throws HandlerRequestException {
        return HandlerRequest.resolveParametersAndCreate(new CheckpointTriggerRequestBody(checkpointType, triggerId), new CheckpointTriggerMessageParameters(), Collections.singletonMap("jobid", JOB_ID.toString()), Collections.emptyMap(), Collections.emptyList());
    }

    private static HandlerRequest<EmptyRequestBody> checkpointTriggerStatusRequest(TriggerId triggerId) throws HandlerRequestException {
        HashMap hashMap = new HashMap();
        hashMap.put("jobid", JOB_ID.toString());
        hashMap.put("triggerid", triggerId.toString());
        return HandlerRequest.resolveParametersAndCreate(EmptyRequestBody.getInstance(), new CheckpointStatusMessageParameters(), hashMap, Collections.emptyMap(), Collections.emptyList());
    }
}
