/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.cleanup;

import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.dispatcher.cleanup.DefaultResourceCleaner;
import org.apache.flink.runtime.dispatcher.cleanup.TestingRetryStrategies;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.RetryStrategy;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={TestLoggerExtension.class})
class DefaultResourceCleanerTest {
    private static final Executor EXECUTOR = Executors.directExecutor();
    private static final JobID JOB_ID = new JobID();

    DefaultResourceCleanerTest() {
    }

    @Test
    void testSuccessfulConcurrentCleanup() {
        SingleCallCleanup cleanup0 = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup cleanup1 = SingleCallCleanup.withoutCompletionOnCleanup();
        CompletableFuture cleanupResult = DefaultResourceCleanerTest.createTestInstanceBuilder().withRegularCleanup("Reg #0", (Object)cleanup0).withRegularCleanup("Reg #1", (Object)cleanup1).build().cleanupAsync(JOB_ID);
        Assertions.assertThat((CompletableFuture)cleanupResult).isNotCompleted();
        Assertions.assertThat((Object)cleanup0).extracting(SingleCallCleanup::getProcessedJobId).isEqualTo((Object)JOB_ID);
        Assertions.assertThat((Object)cleanup1).extracting(SingleCallCleanup::getProcessedJobId).isEqualTo((Object)JOB_ID);
        cleanup0.completeCleanup();
        Assertions.assertThat((CompletableFuture)cleanupResult).isNotCompleted();
        cleanup1.completeCleanup();
        Assertions.assertThat((CompletableFuture)cleanupResult).isCompleted();
    }

    @Test
    void testConcurrentCleanupWithExceptionFirst() {
        SingleCallCleanup cleanup0 = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup cleanup1 = SingleCallCleanup.withoutCompletionOnCleanup();
        CompletableFuture cleanupResult = DefaultResourceCleanerTest.createTestInstanceBuilder().withRegularCleanup("Reg #0", (Object)cleanup0).withRegularCleanup("Reg #1", (Object)cleanup1).build().cleanupAsync(JOB_ID);
        Assertions.assertThat((CompletableFuture)cleanupResult).isNotCompleted();
        Assertions.assertThat((Object)cleanup0).extracting(SingleCallCleanup::getProcessedJobId).isEqualTo((Object)JOB_ID);
        Assertions.assertThat((Object)cleanup1).extracting(SingleCallCleanup::getProcessedJobId).isEqualTo((Object)JOB_ID);
        RuntimeException expectedException = new RuntimeException("Expected exception");
        cleanup0.completeCleanupExceptionally(expectedException);
        Assertions.assertThat((CompletableFuture)cleanupResult).isNotCompleted();
        cleanup1.completeCleanup();
        ((ObjectAssert)((ListAssert)((ListAssert)FlinkAssertions.assertThatFuture((CompletableFuture)cleanupResult).eventuallyFailsWith(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).hasExactlyElementsOfTypes(new Class[]{ExecutionException.class, FutureUtils.RetryException.class, CompletionException.class, expectedException.getClass()})).last()).isEqualTo((Object)expectedException);
    }

    @Test
    void testConcurrentCleanupWithExceptionSecond() {
        SingleCallCleanup cleanup0 = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup cleanup1 = SingleCallCleanup.withoutCompletionOnCleanup();
        CompletableFuture cleanupResult = DefaultResourceCleanerTest.createTestInstanceBuilder().withRegularCleanup("Reg #0", (Object)cleanup0).withRegularCleanup("Reg #1", (Object)cleanup1).build().cleanupAsync(JOB_ID);
        Assertions.assertThat((CompletableFuture)cleanupResult).isNotCompleted();
        Assertions.assertThat((Object)cleanup0).extracting(SingleCallCleanup::getProcessedJobId).isEqualTo((Object)JOB_ID);
        Assertions.assertThat((Object)cleanup1).extracting(SingleCallCleanup::getProcessedJobId).isEqualTo((Object)JOB_ID);
        cleanup0.completeCleanup();
        Assertions.assertThat((CompletableFuture)cleanupResult).isNotCompleted();
        RuntimeException expectedException = new RuntimeException("Expected exception");
        cleanup1.completeCleanupExceptionally(expectedException);
        ((ObjectAssert)((ListAssert)((ListAssert)FlinkAssertions.assertThatFuture((CompletableFuture)cleanupResult).eventuallyFailsWith(ExecutionException.class).extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE)).hasExactlyElementsOfTypes(new Class[]{ExecutionException.class, FutureUtils.RetryException.class, CompletionException.class, expectedException.getClass()})).last()).isEqualTo((Object)expectedException);
    }

    @Test
    void testHighestPriorityCleanupBlocksAllOtherCleanups() {
        SingleCallCleanup highPriorityCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup lowerThanHighPriorityCleanup = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup noPriorityCleanup0 = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup noPriorityCleanup1 = SingleCallCleanup.withCompletionOnCleanup();
        DefaultResourceCleaner testInstance = DefaultResourceCleanerTest.createTestInstanceBuilder().withPrioritizedCleanup("Prio #0", (Object)highPriorityCleanup).withPrioritizedCleanup("Prio #1", (Object)lowerThanHighPriorityCleanup).withRegularCleanup("Reg #0", (Object)noPriorityCleanup0).withRegularCleanup("Reg #1", (Object)noPriorityCleanup1).build();
        CompletableFuture overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
        Assertions.assertThat((boolean)highPriorityCleanup.isDone()).isFalse();
        Assertions.assertThat((boolean)lowerThanHighPriorityCleanup.isDone()).isFalse();
        Assertions.assertThat((boolean)noPriorityCleanup0.isDone()).isFalse();
        Assertions.assertThat((boolean)noPriorityCleanup1.isDone()).isFalse();
        Assertions.assertThat((boolean)overallCleanupResult.isDone()).isFalse();
        highPriorityCleanup.completeCleanup();
        Assertions.assertThat((CompletableFuture)overallCleanupResult).isCompleted();
        Assertions.assertThat((boolean)highPriorityCleanup.isDone()).isTrue();
        Assertions.assertThat((boolean)lowerThanHighPriorityCleanup.isDone()).isTrue();
        Assertions.assertThat((boolean)noPriorityCleanup0.isDone()).isTrue();
        Assertions.assertThat((boolean)noPriorityCleanup1.isDone()).isTrue();
    }

    @Test
    void testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
        SingleCallCleanup highPriorityCleanup = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup lowerThanHighPriorityCleanup = SingleCallCleanup.withoutCompletionOnCleanup();
        SingleCallCleanup noPriorityCleanup0 = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup noPriorityCleanup1 = SingleCallCleanup.withCompletionOnCleanup();
        DefaultResourceCleaner testInstance = DefaultResourceCleanerTest.createTestInstanceBuilder().withPrioritizedCleanup("Prio #0", (Object)highPriorityCleanup).withPrioritizedCleanup("Prio #1", (Object)lowerThanHighPriorityCleanup).withRegularCleanup("Reg #0", (Object)noPriorityCleanup0).withRegularCleanup("Reg #1", (Object)noPriorityCleanup1).build();
        Assertions.assertThat((boolean)highPriorityCleanup.isDone()).isFalse();
        CompletableFuture overallCleanupResult = testInstance.cleanupAsync(JOB_ID);
        Assertions.assertThat((boolean)highPriorityCleanup.isDone()).isTrue();
        Assertions.assertThat((boolean)lowerThanHighPriorityCleanup.isDone()).isFalse();
        Assertions.assertThat((boolean)noPriorityCleanup0.isDone()).isFalse();
        Assertions.assertThat((boolean)noPriorityCleanup1.isDone()).isFalse();
        Assertions.assertThat((boolean)overallCleanupResult.isDone()).isFalse();
        lowerThanHighPriorityCleanup.completeCleanup();
        Assertions.assertThat((CompletableFuture)overallCleanupResult).isCompleted();
        Assertions.assertThat((boolean)highPriorityCleanup.isDone()).isTrue();
        Assertions.assertThat((boolean)lowerThanHighPriorityCleanup.isDone()).isTrue();
        Assertions.assertThat((boolean)noPriorityCleanup0.isDone()).isTrue();
        Assertions.assertThat((boolean)noPriorityCleanup1.isDone()).isTrue();
    }

    @Test
    void testCleanupWithRetries() {
        ArrayList<JobID> actualJobIds = new ArrayList<JobID>();
        CleanupCallback cleanupWithRetries = DefaultResourceCleanerTest.cleanupWithInitialFailingRuns(actualJobIds, 2);
        SingleCallCleanup oneRunCleanup = SingleCallCleanup.withCompletionOnCleanup();
        CompletableFuture compositeCleanupResult = DefaultResourceCleanerTest.createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2)).withRegularCleanup("Reg #0", (Object)cleanupWithRetries).withRegularCleanup("Reg #1", (Object)oneRunCleanup).build().cleanupAsync(JOB_ID);
        FlinkAssertions.assertThatFuture((CompletableFuture)compositeCleanupResult).eventuallySucceeds();
        Assertions.assertThat((Comparable)oneRunCleanup.getProcessedJobId()).isEqualTo((Object)JOB_ID);
        Assertions.assertThat((boolean)oneRunCleanup.isDone()).isTrue();
        Assertions.assertThat(actualJobIds).containsExactly((Object[])new JobID[]{JOB_ID, JOB_ID, JOB_ID});
    }

    @Test
    void testCleanupWithSingleRetryInHighPriorityTask() {
        ArrayList<JobID> actualJobIds = new ArrayList<JobID>();
        CleanupCallback cleanupWithRetry = DefaultResourceCleanerTest.cleanupWithInitialFailingRuns(actualJobIds, 1);
        SingleCallCleanup oneRunHigherPriorityCleanup = SingleCallCleanup.withCompletionOnCleanup();
        SingleCallCleanup oneRunCleanup = SingleCallCleanup.withCompletionOnCleanup();
        CompletableFuture compositeCleanupResult = DefaultResourceCleanerTest.createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1)).withPrioritizedCleanup("Prio #0", (Object)cleanupWithRetry).withPrioritizedCleanup("Prio #1", (Object)oneRunHigherPriorityCleanup).withRegularCleanup("Reg #0", (Object)oneRunCleanup).build().cleanupAsync(JOB_ID);
        FlinkAssertions.assertThatFuture((CompletableFuture)compositeCleanupResult).eventuallySucceeds();
        Assertions.assertThat((Comparable)oneRunCleanup.getProcessedJobId()).isEqualTo((Object)JOB_ID);
        Assertions.assertThat((boolean)oneRunCleanup.isDone()).isTrue();
        Assertions.assertThat(actualJobIds).containsExactly((Object[])new JobID[]{JOB_ID, JOB_ID});
    }

    private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder() {
        return DefaultResourceCleanerTest.createTestInstanceBuilder(TestingRetryStrategies.NO_RETRY_STRATEGY);
    }

    private static DefaultResourceCleaner.Builder<CleanupCallback> createTestInstanceBuilder(RetryStrategy retryStrategy) {
        return DefaultResourceCleaner.forCleanableResources((ComponentMainThreadExecutor)ComponentMainThreadExecutorServiceAdapter.forMainThread(), (Executor)EXECUTOR, BiFunction::apply, (RetryStrategy)retryStrategy);
    }

    private static CleanupCallback cleanupWithInitialFailingRuns(Collection<JobID> actualJobIds, int numberOfFailureRuns) {
        AtomicInteger failureRunCount = new AtomicInteger(numberOfFailureRuns);
        return (actualJobId, executor) -> {
            actualJobIds.add((JobID)actualJobId);
            if (failureRunCount.getAndDecrement() > 0) {
                return FutureUtils.completedExceptionally((Throwable)new RuntimeException("Expected RuntimeException"));
            }
            return FutureUtils.completedVoidFuture();
        };
    }

    private static class SingleCallCleanup
    implements CleanupCallback {
        private final CompletableFuture<Void> resultFuture = new CompletableFuture();
        private JobID jobId;
        private final Consumer<CompletableFuture<Void>> internalFunction;

        public static SingleCallCleanup withCompletionOnCleanup() {
            return new SingleCallCleanup(resultFuture -> resultFuture.complete(null));
        }

        public static SingleCallCleanup withoutCompletionOnCleanup() {
            return new SingleCallCleanup(ignoredResultFuture -> {});
        }

        private SingleCallCleanup(Consumer<CompletableFuture<Void>> internalFunction) {
            this.internalFunction = internalFunction;
        }

        @Override
        public CompletableFuture<Void> apply(JobID jobId, Executor executor) {
            Preconditions.checkState((this.jobId == null ? 1 : 0) != 0);
            this.jobId = jobId;
            this.internalFunction.accept(this.resultFuture);
            return this.resultFuture;
        }

        public boolean isDone() {
            return this.resultFuture.isDone();
        }

        public JobID getProcessedJobId() {
            return this.jobId;
        }

        public void completeCleanup() {
            this.resultFuture.complete(null);
        }

        public void completeCleanupExceptionally(Throwable expectedException) {
            this.resultFuture.completeExceptionally(expectedException);
        }
    }

    private static interface CleanupCallback
    extends BiFunction<JobID, Executor, CompletableFuture<Void>> {
    }
}

