/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rest.handler.job.checkpoints;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rest.handler.HandlerRequest;
import org.apache.flink.runtime.rest.handler.RestHandlerException;
import org.apache.flink.runtime.rest.handler.job.checkpoints.AbstractCheckpointStatsHandler;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
import org.apache.flink.runtime.rest.messages.util.stats.StatsSummaryDto;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.TestingRestfulGateway;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.shaded.guava33.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava33.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.Test;

class AbstractCheckpointStatsHandlerTest {
    private static final Duration TIMEOUT = Duration.ofSeconds(10L);
    private static final JobID JOB_ID = new JobID();
    private static final CheckpointStatsTracker checkpointStatsTracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());

    AbstractCheckpointStatsHandlerTest() {
    }

    @Test
    void testRetrieveSnapshotFromCache() throws Exception {
        GatewayRetriever leaderRetriever = () -> CompletableFuture.completedFuture(null);
        CheckpointingStatistics checkpointingStatistics = this.getTestCheckpointingStatistics();
        CheckpointStatsSnapshot checkpointStatsSnapshot1 = AbstractCheckpointStatsHandlerTest.getTestCheckpointStatsSnapshot();
        Cache cache = CacheBuilder.newBuilder().build();
        try (RecordingCheckpointStatsHandler checkpointStatsHandler = new RecordingCheckpointStatsHandler((GatewayRetriever<? extends RestfulGateway>)leaderRetriever, TIMEOUT, Collections.emptyMap(), (MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters>)CheckpointingStatisticsHeaders.getInstance(), (Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>)cache, Executors.directExecutor(), checkpointingStatistics);){
            TestingRestfulGateway functioningRestfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setRequestCheckpointStatsSnapshotFunction(jobID -> CompletableFuture.completedFuture(checkpointStatsSnapshot1))).build();
            HandlerRequest request = HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobMessageParameters(), Collections.singletonMap("jobid", JOB_ID.toString()), Collections.emptyMap(), Collections.emptyList());
            AssertionsForClassTypes.assertThat((Object)((CheckpointingStatistics)checkpointStatsHandler.handleRequest(request, functioningRestfulGateway).get())).usingRecursiveComparison().isEqualTo((Object)checkpointingStatistics);
            AssertionsForClassTypes.assertThat((Object)checkpointStatsHandler.getStoredCheckpointStats()).isEqualTo((Object)checkpointStatsSnapshot1);
            CheckpointStatsSnapshot checkpointStatsSnapshot2 = AbstractCheckpointStatsHandlerTest.getTestCheckpointStatsSnapshot();
            TestingRestfulGateway refreshedRestfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setRequestCheckpointStatsSnapshotFunction(jobID -> CompletableFuture.completedFuture(checkpointStatsSnapshot2))).build();
            AssertionsForClassTypes.assertThat((Object)((CheckpointingStatistics)checkpointStatsHandler.handleRequest(request, refreshedRestfulGateway).get())).usingRecursiveComparison().isEqualTo((Object)checkpointingStatistics);
            AssertionsForClassTypes.assertThat((Object)checkpointStatsHandler.getStoredCheckpointStats()).isEqualTo((Object)checkpointStatsSnapshot2);
        }
    }

    @Test
    void testRestExceptionPassedThrough() throws Exception {
        GatewayRetriever leaderRetriever = () -> CompletableFuture.completedFuture(null);
        CheckpointStatsSnapshot checkpointStatsSnapshot1 = AbstractCheckpointStatsHandlerTest.getTestCheckpointStatsSnapshot();
        RestHandlerException restHandlerException = new RestHandlerException("some exception thrown", HttpResponseStatus.INTERNAL_SERVER_ERROR);
        try (ThrowingCheckpointStatsHandler checkpointStatsHandler = new ThrowingCheckpointStatsHandler((GatewayRetriever<? extends RestfulGateway>)leaderRetriever, TIMEOUT, Collections.emptyMap(), (MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters>)CheckpointingStatisticsHeaders.getInstance(), (Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>)CacheBuilder.newBuilder().build(), Executors.directExecutor(), restHandlerException);){
            TestingRestfulGateway restfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setRequestCheckpointStatsSnapshotFunction(jobID -> CompletableFuture.completedFuture(checkpointStatsSnapshot1))).build();
            HandlerRequest request = HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobMessageParameters(), Collections.singletonMap("jobid", JOB_ID.toString()), Collections.emptyMap(), Collections.emptyList());
            AssertionsForClassTypes.assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> checkpointStatsHandler.handleRequest(request, restfulGateway).get()).withCause((Throwable)restHandlerException);
        }
    }

    @Test
    void testFlinkJobNotFoundException() throws Exception {
        GatewayRetriever leaderRetriever = () -> CompletableFuture.completedFuture(null);
        CheckpointStatsSnapshot checkpointStatsSnapshot1 = AbstractCheckpointStatsHandlerTest.getTestCheckpointStatsSnapshot();
        CompletableFuture failedFuture = new CompletableFuture();
        failedFuture.completeExceptionally((Throwable)new FlinkJobNotFoundException(JOB_ID));
        try (RecordingCheckpointStatsHandler checkpointStatsHandler = new RecordingCheckpointStatsHandler((GatewayRetriever<? extends RestfulGateway>)leaderRetriever, TIMEOUT, Collections.emptyMap(), (MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters>)CheckpointingStatisticsHeaders.getInstance(), (Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>>)CacheBuilder.newBuilder().build(), Executors.directExecutor(), null);){
            TestingRestfulGateway restfulGateway = ((TestingRestfulGateway.Builder)new TestingRestfulGateway.Builder().setRequestCheckpointStatsSnapshotFunction(jobID -> failedFuture)).build();
            HandlerRequest request = HandlerRequest.resolveParametersAndCreate((RequestBody)EmptyRequestBody.getInstance(), (MessageParameters)new JobMessageParameters(), Collections.singletonMap("jobid", JOB_ID.toString()), Collections.emptyMap(), Collections.emptyList());
            ((CompletableFutureAssert)AssertionsForClassTypes.assertThat((CompletableFuture)checkpointStatsHandler.handleRequest(request, restfulGateway)).isCompletedExceptionally()).failsWithin(Duration.ofSeconds(1L)).withThrowableOfType(ExecutionException.class).withStackTraceContaining("Job %s not found", new Object[]{JOB_ID});
        }
    }

    private static CheckpointStatsSnapshot getTestCheckpointStatsSnapshot() {
        return checkpointStatsTracker.createSnapshot();
    }

    private CheckpointingStatistics getTestCheckpointingStatistics() {
        CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1L, 2L, 3, 4L, 5L);
        CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(new StatsSummaryDto(1L, 1L, 1L, 0.0, 0.0, 0.0, 0.0, 0.0), new StatsSummaryDto(1L, 1L, 1L, 0.0, 0.0, 0.0, 0.0, 0.0), new StatsSummaryDto(2L, 2L, 2L, 0.0, 0.0, 0.0, 0.0, 0.0), new StatsSummaryDto(3L, 3L, 3L, 0.0, 0.0, 0.0, 0.0, 0.0), new StatsSummaryDto(4L, 4L, 4L, 0.0, 0.0, 0.0, 0.0, 0.0), new StatsSummaryDto(5L, 5L, 5L, 0.0, 0.0, 0.0, 0.0, 0.0));
        return new CheckpointingStatistics(counts, summary, new CheckpointingStatistics.LatestCheckpoints(null, null, null, null), Collections.emptyList());
    }

    private static class ThrowingCheckpointStatsHandler
    extends AbstractCheckpointStatsHandler<CheckpointingStatistics, JobMessageParameters> {
        private final RestHandlerException exception;

        protected ThrowingCheckpointStatsHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Duration timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders, Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> checkpointStatsSnapshotCache, Executor executor, RestHandlerException exception) {
            super(leaderRetriever, timeout, responseHeaders, messageHeaders, checkpointStatsSnapshotCache, executor);
            this.exception = exception;
        }

        protected CheckpointingStatistics handleCheckpointStatsRequest(HandlerRequest<EmptyRequestBody> request, CheckpointStatsSnapshot checkpointStatsSnapshot) throws RestHandlerException {
            throw this.exception;
        }
    }

    private static class RecordingCheckpointStatsHandler
    extends AbstractCheckpointStatsHandler<CheckpointingStatistics, JobMessageParameters> {
        private final CheckpointingStatistics returnValue;
        private CheckpointStatsSnapshot storedCheckpointStats;

        protected RecordingCheckpointStatsHandler(GatewayRetriever<? extends RestfulGateway> leaderRetriever, Duration timeout, Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders, Cache<JobID, CompletableFuture<CheckpointStatsSnapshot>> checkpointStatsSnapshotCache, Executor executor, CheckpointingStatistics returnValue) {
            super(leaderRetriever, timeout, responseHeaders, messageHeaders, checkpointStatsSnapshotCache, executor);
            this.returnValue = returnValue;
        }

        protected CheckpointingStatistics handleCheckpointStatsRequest(HandlerRequest<EmptyRequestBody> request, CheckpointStatsSnapshot checkpointStatsSnapshot) throws RestHandlerException {
            this.storedCheckpointStats = checkpointStatsSnapshot;
            return this.returnValue;
        }

        public CheckpointStatsSnapshot getStoredCheckpointStats() {
            return this.storedCheckpointStats;
        }
    }
}

