/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStore;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcSystemUtils;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.token.DelegationTokenManager;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.BlobServerExtension;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.VoidMetricQueryServiceRetriever;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class ProcessFailureCancelingITCase {
    private static final String TASK_DEPLOYED_MARKER = "deployed";
    private static final Duration TIMEOUT = Duration.ofMinutes(2L);
    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    @RegisterExtension
    public final EachCallbackWrapper<BlobServerExtension> blobServerExtensionWrapper = new EachCallbackWrapper((CustomExtension)new BlobServerExtension());
    @RegisterExtension
    public final EachCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper = new EachCallbackWrapper((CustomExtension)new ZooKeeperExtension());
    @TempDir
    public Path temporaryFolder;

    ProcessFailureCancelingITCase() {
    }

    @Test
    void testCancelingOnProcessFailure() throws Throwable {
        HighAvailabilityServices haServices;
        DispatcherResourceManagerComponent dispatcherResourceManagerComponent;
        RpcService rpcService;
        TestingFatalErrorHandler fatalErrorHandler;
        block8: {
            Assumptions.assumeTrue((CommonTestUtils.getJavaCommandPath() != null ? 1 : 0) != 0, (String)"---- Skipping Process Failure test : Could not find java executable ----");
            TestProcessBuilder.TestProcess taskManagerProcess = null;
            fatalErrorHandler = new TestingFatalErrorHandler();
            final Configuration config = new Configuration();
            config.setString(JobManagerOptions.ADDRESS, "localhost");
            config.set(AkkaOptions.ASK_TIMEOUT_DURATION, (Object)Duration.ofSeconds(100L));
            config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
            config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ((ZooKeeperExtension)this.zooKeeperExtensionWrapper.getCustomExtension()).getConnectString());
            config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TempDirUtils.newFolder((Path)this.temporaryFolder).getAbsolutePath());
            config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
            config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"4m"));
            config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, (Object)MemorySize.parse((String)"3200k"));
            config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, (Object)MemorySize.parse((String)"3200k"));
            config.set(TaskManagerOptions.TASK_HEAP_MEMORY, (Object)MemorySize.parse((String)"128m"));
            config.set(TaskManagerOptions.CPU_CORES, (Object)1.0);
            config.setInteger(RestOptions.PORT, 0);
            rpcService = RpcSystem.load().remoteServiceBuilder(config, "localhost", "0").createAndStart();
            int jobManagerPort = rpcService.getPort();
            config.setInteger(JobManagerOptions.PORT, jobManagerPort);
            DefaultDispatcherResourceManagerComponentFactory resourceManagerComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory((ResourceManagerFactory)StandaloneResourceManagerFactory.getInstance());
            dispatcherResourceManagerComponent = null;
            ScheduledExecutorService ioExecutor = (ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor();
            haServices = HighAvailabilityServicesUtils.createHighAvailabilityServices((Configuration)config, (Executor)ioExecutor, (AddressResolution)AddressResolution.NO_ADDRESS_RESOLUTION, (RpcSystemUtils)RpcSystem.load(), (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
            final AtomicReference programException = new AtomicReference();
            try {
                dispatcherResourceManagerComponent = resourceManagerComponentFactory.create(config, ResourceID.generate(), (Executor)ioExecutor, rpcService, haServices, ((BlobServerExtension)this.blobServerExtensionWrapper.getCustomExtension()).getBlobServer(), (HeartbeatServices)new HeartbeatServicesImpl(100L, 10000L, 2), (DelegationTokenManager)new NoOpDelegationTokenManager(), NoOpMetricRegistry.INSTANCE, (ExecutionGraphInfoStore)new MemoryExecutionGraphInfoStore(), (MetricQueryServiceRetriever)VoidMetricQueryServiceRetriever.INSTANCE, Collections.emptySet(), (FatalErrorHandler)fatalErrorHandler);
                TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
                taskManagerProcessBuilder.addConfigAsMainClassArgs(config);
                taskManagerProcess = taskManagerProcessBuilder.start();
                Runnable programRunner = new Runnable(){

                    @Override
                    public void run() {
                        try {
                            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)1337, (Configuration)config, (String[])new String[0]);
                            env.setParallelism(2);
                            env.setRestartStrategy(RestartStrategies.noRestart());
                            env.generateSequence(0L, Long.MAX_VALUE).map((MapFunction)new MapFunction<Long, Long>(){

                                /*
                                 * WARNING - Removed try catching itself - possible behaviour change.
                                 */
                                public Long map(Long value) throws Exception {
                                    1 var2_2 = this;
                                    synchronized (var2_2) {
                                        System.out.println(ProcessFailureCancelingITCase.TASK_DEPLOYED_MARKER);
                                        this.wait();
                                    }
                                    return 0L;
                                }
                            }).output((OutputFormat)new DiscardingOutputFormat());
                            env.execute();
                        }
                        catch (Throwable t) {
                            programException.set(t);
                        }
                    }
                };
                Thread programThread = new Thread(programRunner);
                programThread.start();
                ProcessFailureCancelingITCase.waitUntilAtLeastOneTaskHasBeenDeployed(taskManagerProcess);
                taskManagerProcess.destroy();
                taskManagerProcess = null;
                programThread.join(TIMEOUT.toMillis());
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)programThread.isAlive()).withFailMessage("The program did not cancel in time", new Object[0])).isFalse();
                ((AbstractThrowableAssert)Assertions.assertThat((Throwable)((Throwable)programException.get())).withFailMessage("The program did not fail properly", new Object[0])).isInstanceOf(ProgramInvocationException.class);
                if (taskManagerProcess == null) break block8;
            }
            catch (Error | Exception e) {
                try {
                    if (taskManagerProcess != null) {
                        ProcessFailureCancelingITCase.printOutput("TaskManager OUT", taskManagerProcess.getProcessOutput().toString());
                        ProcessFailureCancelingITCase.printOutput("TaskManager ERR", taskManagerProcess.getErrorOutput().toString());
                    }
                    throw ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)((Throwable)programException.get()));
                }
                catch (Throwable throwable) {
                    if (taskManagerProcess != null) {
                        taskManagerProcess.destroy();
                    }
                    if (dispatcherResourceManagerComponent != null) {
                        dispatcherResourceManagerComponent.stopApplication(ApplicationStatus.SUCCEEDED, null).get();
                    }
                    fatalErrorHandler.rethrowError();
                    RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
                    haServices.closeAndCleanupAllData();
                    throw throwable;
                }
            }
            taskManagerProcess.destroy();
        }
        if (dispatcherResourceManagerComponent != null) {
            dispatcherResourceManagerComponent.stopApplication(ApplicationStatus.SUCCEEDED, null).get();
        }
        fatalErrorHandler.rethrowError();
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        haServices.closeAndCleanupAllData();
    }

    private static void waitUntilAtLeastOneTaskHasBeenDeployed(TestProcessBuilder.TestProcess taskManagerProcess) throws InterruptedException, TimeoutException {
        org.apache.flink.core.testutils.CommonTestUtils.waitUtil(() -> taskManagerProcess.getProcessOutput().toString().contains(TASK_DEPLOYED_MARKER), (Duration)Duration.ofMinutes(2L), null);
    }

    private static void printOutput(String processName, String logContents) {
        if (logContents == null || logContents.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
        System.out.println("-----------------------------------------");
        System.out.println(logContents);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }
}

