package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URL;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.core.security.FlinkSecurityManager;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.shuffle.ShuffleIOOwnerContext;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload;
import org.apache.flink.types.Either;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TaskManagerExceptionUtils;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task.class */
public class Task implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider {
    private static final Logger LOG = LoggerFactory.getLogger(Task.class);
    private static final ThreadGroup TASK_THREADS_GROUP = new ThreadGroup("Flink Task Threads");
    private static final AtomicReferenceFieldUpdater<Task, ExecutionState> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Task.class, ExecutionState.class, "executionState");
    private final JobID jobId;
    private final JobVertexID vertexId;
    private final ExecutionAttemptID executionId;
    private final AllocationID allocationId;
    private final TaskInfo taskInfo;
    private final String taskNameWithSubtask;
    private final Configuration jobConfiguration;
    private final Configuration taskConfiguration;
    private final Collection<PermanentBlobKey> requiredJarFiles;
    private final Collection<URL> requiredClasspaths;
    private final String nameOfInvokableClass;
    private final TaskManagerRuntimeInfo taskManagerConfig;
    private final MemoryManager memoryManager;
    private final IOManager ioManager;
    private final BroadcastVariableManager broadcastVariableManager;
    private final TaskEventDispatcher taskEventDispatcher;
    private final ExternalResourceInfoProvider externalResourceInfoProvider;
    private final TaskStateManager taskStateManager;
    private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
    private final ResultPartitionWriter[] partitionWriters;
    private final IndexedInputGate[] inputGates;
    private final TaskManagerActions taskManagerActions;
    private final InputSplitProvider inputSplitProvider;
    private final CheckpointResponder checkpointResponder;
    private final TaskOperatorEventGateway operatorCoordinatorEventGateway;
    private final GlobalAggregateManager aggregateManager;
    private final LibraryCacheManager.ClassLoaderHandle classLoaderHandle;
    private final FileCache fileCache;
    private final KvStateService kvStateService;
    private final AccumulatorRegistry accumulatorRegistry;
    private final Thread executingThread;
    private final TaskMetricGroup metrics;
    private final PartitionProducerStateChecker partitionProducerStateChecker;
    private final Executor executor;
    private final AtomicBoolean invokableHasBeenCanceled;

    @Nullable
    private volatile TaskInvokable invokable;
    private volatile Throwable failureCause;
    private long taskCancellationInterval;
    private long taskCancellationTimeout;
    private UserCodeClassLoader userCodeClassLoader;
    private final CompletableFuture<ExecutionState> terminationFuture = new CompletableFuture<>();
    private volatile ExecutionState executionState = ExecutionState.CREATED;

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$NotifyCheckpointOperation.class */
    public enum NotifyCheckpointOperation {
        ABORT,
        COMPLETE,
        SUBSUME
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$PartitionProducerStateResponseHandle.class */
    class PartitionProducerStateResponseHandle implements PartitionProducerStateProvider.ResponseHandle {
        private final Either<ExecutionState, Throwable> result;

        PartitionProducerStateResponseHandle(@Nullable ExecutionState executionState, @Nullable Throwable th) {
            this.result = executionState != null ? Either.Left(executionState) : Either.Right(th);
        }

        @Override // org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle
        public ExecutionState getConsumerExecutionState() {
            return Task.this.executionState;
        }

        @Override // org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle
        public Either<ExecutionState, Throwable> getProducerExecutionState() {
            return this.result;
        }

        @Override // org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle
        public void cancelConsumption() {
            Task.this.cancelExecution();
        }

        @Override // org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider.ResponseHandle
        public void failConsumption(Throwable th) {
            Task.this.failExternally(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskCanceler.class */
    public class TaskCanceler implements Runnable {
        private final Logger logger;
        private final TaskInvokable invokable;
        private final Thread executer;
        private final String taskName;

        TaskCanceler(Logger logger, TaskInvokable taskInvokable, Thread thread, String str) {
            this.logger = logger;
            this.invokable = taskInvokable;
            this.executer = thread;
            this.taskName = str;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.invokable.cancel();
                } catch (Throwable th) {
                    ExceptionUtils.rethrowIfFatalError(th);
                    this.logger.error("Error while canceling the task {}.", this.taskName, th);
                }
                Task.this.failAllResultPartitions();
                Task.this.closeAllInputGates();
                this.invokable.maybeInterruptOnCancel(this.executer, null, null);
            } catch (Throwable th2) {
                ExceptionUtils.rethrowIfFatalError(th2);
                this.logger.error("Error in the task canceler for task {}.", this.taskName, th2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskCancelerWatchDog.class */
    public static class TaskCancelerWatchDog implements Runnable {
        private final Thread executerThread;
        private final TaskManagerActions taskManager;
        private final long timeoutMillis;
        private final TaskInfo taskInfo;

        TaskCancelerWatchDog(TaskInfo taskInfo, Thread thread, TaskManagerActions taskManagerActions, long j) {
            Preconditions.checkArgument(j > 0);
            this.taskInfo = taskInfo;
            this.executerThread = thread;
            this.taskManager = taskManagerActions;
            this.timeoutMillis = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Deadline fromNow = Deadline.fromNow(Duration.ofMillis(this.timeoutMillis));
                while (this.executerThread.isAlive() && fromNow.hasTimeLeft()) {
                    try {
                        this.executerThread.join(Math.max(1L, fromNow.timeLeft().toMillis()));
                    } catch (InterruptedException e) {
                    }
                }
                if (this.executerThread.isAlive()) {
                    Task.logTaskThreadStackTrace(this.executerThread, this.taskInfo.getTaskNameWithSubtasks(), this.timeoutMillis, "notifying TM");
                    String str = "Task did not exit gracefully within " + (this.timeoutMillis / 1000) + " + seconds.";
                    this.taskManager.notifyFatalError(str, new FlinkRuntimeException(str));
                }
            } catch (Throwable th) {
                throw new FlinkRuntimeException("Error in Task Cancellation Watch Dog", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/Task$TaskInterrupter.class */
    public static final class TaskInterrupter implements Runnable {
        private final Logger log;
        private final TaskInvokable task;
        private final Thread executerThread;
        private final String taskName;
        private final long interruptIntervalMillis;

        TaskInterrupter(Logger logger, TaskInvokable taskInvokable, Thread thread, String str, long j) {
            this.log = logger;
            this.task = taskInvokable;
            this.executerThread = thread;
            this.taskName = str;
            this.interruptIntervalMillis = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.executerThread.join(this.interruptIntervalMillis);
                while (this.executerThread.isAlive()) {
                    this.task.maybeInterruptOnCancel(this.executerThread, this.taskName, Long.valueOf(this.interruptIntervalMillis));
                    try {
                        this.executerThread.join(this.interruptIntervalMillis);
                    } catch (InterruptedException e) {
                    }
                }
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                this.log.error("Error in the task canceler for task {}.", this.taskName, th);
            }
        }
    }

    public Task(JobInformation jobInformation, TaskInformation taskInformation, ExecutionAttemptID executionAttemptID, AllocationID allocationID, List<ResultPartitionDeploymentDescriptor> list, List<InputGateDeploymentDescriptor> list2, MemoryManager memoryManager, IOManager iOManager, ShuffleEnvironment<?, ?> shuffleEnvironment, KvStateService kvStateService, BroadcastVariableManager broadcastVariableManager, TaskEventDispatcher taskEventDispatcher, ExternalResourceInfoProvider externalResourceInfoProvider, TaskStateManager taskStateManager, TaskManagerActions taskManagerActions, InputSplitProvider inputSplitProvider, CheckpointResponder checkpointResponder, TaskOperatorEventGateway taskOperatorEventGateway, GlobalAggregateManager globalAggregateManager, LibraryCacheManager.ClassLoaderHandle classLoaderHandle, FileCache fileCache, TaskManagerRuntimeInfo taskManagerRuntimeInfo, @Nonnull TaskMetricGroup taskMetricGroup, PartitionProducerStateChecker partitionProducerStateChecker, Executor executor) {
        Preconditions.checkNotNull(jobInformation);
        Preconditions.checkNotNull(taskInformation);
        this.taskInfo = new TaskInfo(taskInformation.getTaskName(), taskInformation.getMaxNumberOfSubtasks(), executionAttemptID.getSubtaskIndex(), taskInformation.getNumberOfSubtasks(), executionAttemptID.getAttemptNumber(), String.valueOf(allocationID));
        this.jobId = jobInformation.getJobId();
        this.vertexId = taskInformation.getJobVertexId();
        this.executionId = (ExecutionAttemptID) Preconditions.checkNotNull(executionAttemptID);
        this.allocationId = (AllocationID) Preconditions.checkNotNull(allocationID);
        this.taskNameWithSubtask = this.taskInfo.getTaskNameWithSubtasks();
        this.jobConfiguration = jobInformation.getJobConfiguration();
        this.taskConfiguration = taskInformation.getTaskConfiguration();
        this.requiredJarFiles = jobInformation.getRequiredJarFileBlobKeys();
        this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
        this.nameOfInvokableClass = taskInformation.getInvokableClassName();
        this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
        Configuration configuration = taskManagerRuntimeInfo.getConfiguration();
        this.taskCancellationInterval = configuration.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
        this.taskCancellationTimeout = configuration.getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT);
        this.memoryManager = (MemoryManager) Preconditions.checkNotNull(memoryManager);
        this.ioManager = (IOManager) Preconditions.checkNotNull(iOManager);
        this.broadcastVariableManager = (BroadcastVariableManager) Preconditions.checkNotNull(broadcastVariableManager);
        this.taskEventDispatcher = (TaskEventDispatcher) Preconditions.checkNotNull(taskEventDispatcher);
        this.taskStateManager = (TaskStateManager) Preconditions.checkNotNull(taskStateManager);
        this.accumulatorRegistry = new AccumulatorRegistry(this.jobId, this.executionId);
        this.inputSplitProvider = (InputSplitProvider) Preconditions.checkNotNull(inputSplitProvider);
        this.checkpointResponder = (CheckpointResponder) Preconditions.checkNotNull(checkpointResponder);
        this.operatorCoordinatorEventGateway = (TaskOperatorEventGateway) Preconditions.checkNotNull(taskOperatorEventGateway);
        this.aggregateManager = (GlobalAggregateManager) Preconditions.checkNotNull(globalAggregateManager);
        this.taskManagerActions = (TaskManagerActions) Preconditions.checkNotNull(taskManagerActions);
        this.externalResourceInfoProvider = (ExternalResourceInfoProvider) Preconditions.checkNotNull(externalResourceInfoProvider);
        this.classLoaderHandle = (LibraryCacheManager.ClassLoaderHandle) Preconditions.checkNotNull(classLoaderHandle);
        this.fileCache = (FileCache) Preconditions.checkNotNull(fileCache);
        this.kvStateService = (KvStateService) Preconditions.checkNotNull(kvStateService);
        this.taskManagerConfig = (TaskManagerRuntimeInfo) Preconditions.checkNotNull(taskManagerRuntimeInfo);
        this.metrics = taskMetricGroup;
        this.partitionProducerStateChecker = (PartitionProducerStateChecker) Preconditions.checkNotNull(partitionProducerStateChecker);
        this.executor = (Executor) Preconditions.checkNotNull(executor);
        ShuffleIOOwnerContext createShuffleIOOwnerContext = shuffleEnvironment.createShuffleIOOwnerContext(this.taskNameWithSubtask + " (" + this.executionId + ')', this.executionId, this.metrics.getIOMetricGroup());
        ResultPartitionWriter[] resultPartitionWriterArr = (ResultPartitionWriter[]) shuffleEnvironment.createResultPartitionWriters(createShuffleIOOwnerContext, list).toArray(new ResultPartitionWriter[0]);
        this.partitionWriters = resultPartitionWriterArr;
        IndexedInputGate[] indexedInputGateArr = (IndexedInputGate[]) shuffleEnvironment.createInputGates(createShuffleIOOwnerContext, this, list2).toArray(new IndexedInputGate[0]);
        this.inputGates = new IndexedInputGate[indexedInputGateArr.length];
        int i = 0;
        for (IndexedInputGate indexedInputGate : indexedInputGateArr) {
            int i2 = i;
            i++;
            this.inputGates[i2] = new InputGateWithMetrics(indexedInputGate, this.metrics.getIOMetricGroup().getNumBytesInCounter());
        }
        if (shuffleEnvironment instanceof NettyShuffleEnvironment) {
            ((NettyShuffleEnvironment) shuffleEnvironment).registerLegacyNetworkMetrics(this.metrics.getIOMetricGroup(), resultPartitionWriterArr, indexedInputGateArr);
        }
        this.invokableHasBeenCanceled = new AtomicBoolean(false);
        this.executingThread = new Thread(TASK_THREADS_GROUP, this, this.taskNameWithSubtask);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload
    public JobID getJobID() {
        return this.jobId;
    }

    public JobVertexID getJobVertexId() {
        return this.vertexId;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload
    public ExecutionAttemptID getExecutionId() {
        return this.executionId;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload
    public AllocationID getAllocationId() {
        return this.allocationId;
    }

    public TaskInfo getTaskInfo() {
        return this.taskInfo;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public Configuration getTaskConfiguration() {
        return this.taskConfiguration;
    }

    public AccumulatorRegistry getAccumulatorRegistry() {
        return this.accumulatorRegistry;
    }

    public TaskMetricGroup getMetricGroup() {
        return this.metrics;
    }

    public Thread getExecutingThread() {
        return this.executingThread;
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload
    public CompletableFuture<ExecutionState> getTerminationFuture() {
        return this.terminationFuture;
    }

    @VisibleForTesting
    long getTaskCancellationInterval() {
        return this.taskCancellationInterval;
    }

    @VisibleForTesting
    long getTaskCancellationTimeout() {
        return this.taskCancellationTimeout;
    }

    @VisibleForTesting
    @Nullable
    TaskInvokable getInvokable() {
        return this.invokable;
    }

    public boolean isBackPressured() {
        if (this.invokable == null || this.partitionWriters.length == 0) {
            return false;
        }
        if (this.executionState != ExecutionState.INITIALIZING && this.executionState != ExecutionState.RUNNING) {
            return false;
        }
        for (int i = 0; i < this.partitionWriters.length; i++) {
            if (!this.partitionWriters[i].isAvailable()) {
                return true;
            }
        }
        return false;
    }

    public ExecutionState getExecutionState() {
        return this.executionState;
    }

    public boolean isCanceledOrFailed() {
        return this.executionState == ExecutionState.CANCELING || this.executionState == ExecutionState.CANCELED || this.executionState == ExecutionState.FAILED;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public void startTaskThread() {
        this.executingThread.start();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            doRun();
        } finally {
            this.terminationFuture.complete(this.executionState);
        }
    }

    private void doRun() {
        while (true) {
            ExecutionState executionState = this.executionState;
            if (executionState == ExecutionState.CREATED) {
                if (transitionState(ExecutionState.CREATED, ExecutionState.DEPLOYING)) {
                    HashMap hashMap = new HashMap();
                    try {
                        try {
                            LOG.debug("Creating FileSystem stream leak safety net for task {}", this);
                            FileSystemSafetyNet.initializeSafetyNetForThread();
                            LOG.info("Loading JAR files for task {}.", this);
                            this.userCodeClassLoader = createUserCodeClassloader();
                            ExecutionConfig executionConfig = (ExecutionConfig) this.serializedExecutionConfig.deserializeValue(this.userCodeClassLoader.asClassLoader());
                            if (executionConfig.getTaskCancellationInterval() >= 0) {
                                this.taskCancellationInterval = executionConfig.getTaskCancellationInterval();
                            }
                            if (executionConfig.getTaskCancellationTimeout() >= 0) {
                                this.taskCancellationTimeout = executionConfig.getTaskCancellationTimeout();
                            }
                            if (isCanceledOrFailed()) {
                                throw new CancelTaskException();
                            }
                            LOG.debug("Registering task at network: {}.", this);
                            setupPartitionsAndGates(this.partitionWriters, this.inputGates);
                            for (ResultPartitionWriter resultPartitionWriter : this.partitionWriters) {
                                this.taskEventDispatcher.registerPartition(resultPartitionWriter.getPartitionId());
                            }
                            try {
                                for (Map.Entry entry : DistributedCache.readFileInfoFromConfig(this.jobConfiguration)) {
                                    LOG.info("Obtaining local cache file for '{}'.", entry.getKey());
                                    hashMap.put(entry.getKey(), this.fileCache.createTmpFile((String) entry.getKey(), (DistributedCache.DistributedCacheEntry) entry.getValue(), this.jobId, this.executionId));
                                }
                                if (isCanceledOrFailed()) {
                                    throw new CancelTaskException();
                                }
                                RuntimeEnvironment runtimeEnvironment = new RuntimeEnvironment(this.jobId, this.vertexId, this.executionId, executionConfig, this.taskInfo, this.jobConfiguration, this.taskConfiguration, this.userCodeClassLoader, this.memoryManager, this.ioManager, this.broadcastVariableManager, this.taskStateManager, this.aggregateManager, this.accumulatorRegistry, this.kvStateService.createKvStateTaskRegistry(this.jobId, getJobVertexId()), this.inputSplitProvider, hashMap, this.partitionWriters, this.inputGates, this.taskEventDispatcher, this.checkpointResponder, this.operatorCoordinatorEventGateway, this.taskManagerConfig, this.metrics, this, this.externalResourceInfoProvider);
                                this.executingThread.setContextClassLoader(this.userCodeClassLoader.asClassLoader());
                                FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
                                try {
                                    TaskInvokable loadAndInstantiateInvokable = loadAndInstantiateInvokable(this.userCodeClassLoader.asClassLoader(), this.nameOfInvokableClass, runtimeEnvironment);
                                    FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
                                    this.invokable = loadAndInstantiateInvokable;
                                    restoreAndInvoke(loadAndInstantiateInvokable);
                                    if (isCanceledOrFailed()) {
                                        throw new CancelTaskException();
                                    }
                                    for (ResultPartitionWriter resultPartitionWriter2 : this.partitionWriters) {
                                        if (resultPartitionWriter2 != null) {
                                            resultPartitionWriter2.finish();
                                        }
                                    }
                                    if (!transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED)) {
                                        throw new CancelTaskException();
                                    }
                                    try {
                                        LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                                        this.invokable = null;
                                        releaseResources();
                                        if (loadAndInstantiateInvokable != null) {
                                            this.memoryManager.releaseAll(loadAndInstantiateInvokable);
                                        }
                                        this.fileCache.releaseJob(this.jobId, this.executionId);
                                        LOG.debug("Ensuring all FileSystem streams are closed for task {}", this);
                                        FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                        notifyFinalState();
                                    } catch (Throwable th) {
                                        String format = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                        LOG.error(format, th);
                                        notifyFatalError(format, th);
                                    }
                                    try {
                                        this.metrics.close();
                                        return;
                                    } catch (Throwable th2) {
                                        LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th2});
                                        return;
                                    }
                                } catch (Throwable th3) {
                                    FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
                                    throw th3;
                                }
                            } catch (Exception e) {
                                throw new Exception(String.format("Exception while adding files to distributed cache of task %s (%s).", this.taskNameWithSubtask, this.executionId), e);
                            }
                        } catch (Throwable th4) {
                            Throwable preProcessException = preProcessException(th4);
                            while (true) {
                                try {
                                    ExecutionState executionState2 = this.executionState;
                                    if (executionState2 != ExecutionState.RUNNING && executionState2 != ExecutionState.INITIALIZING && executionState2 != ExecutionState.DEPLOYING) {
                                        if (executionState2 == ExecutionState.CANCELING) {
                                            if (transitionState(executionState2, ExecutionState.CANCELED)) {
                                                break;
                                            }
                                        } else if (executionState2 != ExecutionState.FAILED) {
                                            if (transitionState(executionState2, ExecutionState.FAILED, preProcessException)) {
                                                LOG.error("Unexpected state in task {} ({}) during an exception: {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState2});
                                                break;
                                            }
                                        } else {
                                            break;
                                        }
                                    } else if (!ExceptionUtils.findThrowable(preProcessException, CancelTaskException.class).isPresent()) {
                                        if (transitionState(executionState2, ExecutionState.FAILED, preProcessException)) {
                                            cancelInvokable(null);
                                            break;
                                        }
                                    } else {
                                        if (transitionState(executionState2, ExecutionState.CANCELED, preProcessException)) {
                                            cancelInvokable(null);
                                            break;
                                        }
                                    }
                                } catch (Throwable th5) {
                                    String format2 = String.format("FATAL - exception in exception handler of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                    LOG.error(format2, th5);
                                    notifyFatalError(format2, th5);
                                }
                            }
                            try {
                                LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                                this.invokable = null;
                                releaseResources();
                                if (0 != 0) {
                                    this.memoryManager.releaseAll(null);
                                }
                                this.fileCache.releaseJob(this.jobId, this.executionId);
                                LOG.debug("Ensuring all FileSystem streams are closed for task {}", this);
                                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                                notifyFinalState();
                            } catch (Throwable th6) {
                                String format3 = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                                LOG.error(format3, th6);
                                notifyFatalError(format3, th6);
                            }
                            try {
                                this.metrics.close();
                                return;
                            } catch (Throwable th7) {
                                LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th7});
                                return;
                            }
                        }
                    } catch (Throwable th8) {
                        try {
                            LOG.info("Freeing task resources for {} ({}).", this.taskNameWithSubtask, this.executionId);
                            this.invokable = null;
                            releaseResources();
                            if (0 != 0) {
                                this.memoryManager.releaseAll(null);
                            }
                            this.fileCache.releaseJob(this.jobId, this.executionId);
                            LOG.debug("Ensuring all FileSystem streams are closed for task {}", this);
                            FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                            notifyFinalState();
                        } catch (Throwable th9) {
                            String format4 = String.format("FATAL - exception in resource cleanup of task %s (%s).", this.taskNameWithSubtask, this.executionId);
                            LOG.error(format4, th9);
                            notifyFatalError(format4, th9);
                        }
                        try {
                            this.metrics.close();
                        } catch (Throwable th10) {
                            LOG.error("Error during metrics de-registration of task {} ({}).", new Object[]{this.taskNameWithSubtask, this.executionId, th10});
                        }
                        throw th8;
                    }
                }
            } else {
                if (executionState == ExecutionState.FAILED) {
                    notifyFinalState();
                    if (this.metrics != null) {
                        this.metrics.close();
                        return;
                    }
                    return;
                }
                if (executionState != ExecutionState.CANCELING) {
                    if (this.metrics != null) {
                        this.metrics.close();
                    }
                    throw new IllegalStateException("Invalid state for beginning of operation of task " + this + '.');
                }
                if (transitionState(ExecutionState.CANCELING, ExecutionState.CANCELED)) {
                    notifyFinalState();
                    if (this.metrics != null) {
                        this.metrics.close();
                        return;
                    }
                    return;
                }
            }
        }
    }

    private Throwable preProcessException(Throwable th) {
        if (th instanceof WrappingRuntimeException) {
            th = ((WrappingRuntimeException) th).unwrap();
        }
        TaskManagerExceptionUtils.tryEnrichTaskManagerError(th);
        if (ExceptionUtils.isJvmFatalError(th) || ((th instanceof OutOfMemoryError) && this.taskManagerConfig.shouldExitJvmOnOutOfMemoryError())) {
            try {
                LOG.error("Encountered fatal error {} - terminating the JVM", th.getClass().getName(), th);
                Runtime.getRuntime().halt(-1);
            } catch (Throwable th2) {
                Runtime.getRuntime().halt(-1);
                throw th2;
            }
        }
        return th;
    }

    private void restoreAndInvoke(TaskInvokable taskInvokable) throws Exception {
        try {
            if (!transitionState(ExecutionState.DEPLOYING, ExecutionState.INITIALIZING)) {
                throw new CancelTaskException();
            }
            this.taskManagerActions.updateTaskExecutionState(new TaskExecutionState(this.executionId, ExecutionState.INITIALIZING));
            this.executingThread.setContextClassLoader(this.userCodeClassLoader.asClassLoader());
            taskInvokable.getClass();
            runWithSystemExitMonitoring(taskInvokable::restore);
            if (!transitionState(ExecutionState.INITIALIZING, ExecutionState.RUNNING)) {
                throw new CancelTaskException();
            }
            this.taskManagerActions.updateTaskExecutionState(new TaskExecutionState(this.executionId, ExecutionState.RUNNING));
            taskInvokable.getClass();
            runWithSystemExitMonitoring(taskInvokable::invoke);
            runWithSystemExitMonitoring(() -> {
                taskInvokable.cleanUp(null);
            });
        } catch (Throwable th) {
            try {
                runWithSystemExitMonitoring(() -> {
                    taskInvokable.cleanUp(th);
                });
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void runWithSystemExitMonitoring(RunnableWithException runnableWithException) throws Exception {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            runnableWithException.run();
        } finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

    @VisibleForTesting
    public static void setupPartitionsAndGates(ResultPartitionWriter[] resultPartitionWriterArr, InputGate[] inputGateArr) throws IOException {
        for (ResultPartitionWriter resultPartitionWriter : resultPartitionWriterArr) {
            resultPartitionWriter.setup();
        }
        for (InputGate inputGate : inputGateArr) {
            inputGate.setup();
        }
    }

    private void releaseResources() {
        LOG.debug("Release task {} network resources (state: {}).", this.taskNameWithSubtask, getExecutionState());
        for (ResultPartitionWriter resultPartitionWriter : this.partitionWriters) {
            this.taskEventDispatcher.unregisterPartition(resultPartitionWriter.getPartitionId());
        }
        if (isCanceledOrFailed()) {
            failAllResultPartitions();
        }
        closeAllResultPartitions();
        closeAllInputGates();
        try {
            this.taskStateManager.close();
        } catch (Exception e) {
            LOG.error("Failed to close task state manager for task {}.", this.taskNameWithSubtask, e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failAllResultPartitions() {
        for (ResultPartitionWriter resultPartitionWriter : this.partitionWriters) {
            resultPartitionWriter.fail(getFailureCause());
        }
    }

    private void closeAllResultPartitions() {
        for (ResultPartitionWriter resultPartitionWriter : this.partitionWriters) {
            try {
                resultPartitionWriter.close();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalError(th);
                LOG.error("Failed to release result partition for task {}.", this.taskNameWithSubtask, th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAllInputGates() {
        TaskInvokable taskInvokable = this.invokable;
        if (taskInvokable == null || !taskInvokable.isUsingNonBlockingInput()) {
            for (IndexedInputGate indexedInputGate : this.inputGates) {
                try {
                    indexedInputGate.close();
                } catch (Throwable th) {
                    ExceptionUtils.rethrowIfFatalError(th);
                    LOG.error("Failed to release input gate for task {}.", this.taskNameWithSubtask, th);
                }
            }
        }
    }

    private UserCodeClassLoader createUserCodeClassloader() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        UserCodeClassLoader orResolveClassLoader = this.classLoaderHandle.getOrResolveClassLoader(this.requiredJarFiles, this.requiredClasspaths);
        LOG.debug("Getting user code class loader for task {} at library cache manager took {} milliseconds", this.executionId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        return orResolveClassLoader;
    }

    private void notifyFinalState() {
        Preconditions.checkState(this.executionState.isTerminal());
        this.taskManagerActions.updateTaskExecutionState(new TaskExecutionState(this.executionId, this.executionState, this.failureCause));
    }

    private void notifyFatalError(String str, Throwable th) {
        this.taskManagerActions.notifyFatalError(str, th);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2) {
        return transitionState(executionState, executionState2, null);
    }

    private boolean transitionState(ExecutionState executionState, ExecutionState executionState2, Throwable th) {
        if (!STATE_UPDATER.compareAndSet(this, executionState, executionState2)) {
            return false;
        }
        if (th == null) {
            LOG.info("{} ({}) switched from {} to {}.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2});
            return true;
        }
        if (ExceptionUtils.findThrowable(th, CancelTaskException.class).isPresent()) {
            LOG.info("{} ({}) switched from {} to {} due to CancelTaskException.", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2});
            LOG.debug("{} ({}) switched from {} to {} due to CancelTaskException: {}", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2, ExceptionUtils.stringifyException(th)});
            return true;
        }
        this.failureCause = th;
        LOG.warn("{} ({}) switched from {} to {} with failure cause: {}", new Object[]{this.taskNameWithSubtask, this.executionId, executionState, executionState2, ExceptionUtils.stringifyException(th)});
        return true;
    }

    public void cancelExecution() {
        LOG.info("Attempting to cancel task {} ({}).", this.taskNameWithSubtask, this.executionId);
        cancelOrFailAndCancelInvokable(ExecutionState.CANCELING, null);
    }

    @Override // org.apache.flink.runtime.taskexecutor.slot.TaskSlotPayload, org.apache.flink.runtime.taskmanager.TaskActions
    public void failExternally(Throwable th) {
        LOG.info("Attempting to fail task externally {} ({}).", this.taskNameWithSubtask, this.executionId);
        cancelOrFailAndCancelInvokable(ExecutionState.FAILED, th);
    }

    private void cancelOrFailAndCancelInvokable(ExecutionState executionState, Throwable th) {
        try {
            cancelOrFailAndCancelInvokableInternal(executionState, th);
        } catch (Throwable th2) {
            if (!ExceptionUtils.isJvmFatalOrOutOfMemoryError(th2)) {
                throw th2;
            }
            notifyFatalError(String.format("FATAL - exception in cancelling task %s (%s).", this.taskNameWithSubtask, this.executionId), th2);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:43:0x0019, code lost:
    
        org.apache.flink.runtime.taskmanager.Task.LOG.info("Task {} is already in state {}", r10.taskNameWithSubtask, r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x0029, code lost:
    
        return;
     */
    @org.apache.flink.annotation.VisibleForTesting
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    void cancelOrFailAndCancelInvokableInternal(org.apache.flink.runtime.execution.ExecutionState r11, java.lang.Throwable r12) {
        /*
            Method dump skipped, instructions count: 439
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.taskmanager.Task.cancelOrFailAndCancelInvokableInternal(org.apache.flink.runtime.execution.ExecutionState, java.lang.Throwable):void");
    }

    @Override // org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider
    public void requestPartitionProducerState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID, Consumer<? super PartitionProducerStateProvider.ResponseHandle> consumer) {
        FutureUtils.assertNoException(this.partitionProducerStateChecker.requestPartitionProducerState(this.jobId, intermediateDataSetID, resultPartitionID).handle((executionState, th) -> {
            return new PartitionProducerStateResponseHandle(executionState, th);
        }).thenAcceptAsync((Consumer<? super U>) consumer, this.executor));
    }

    public void triggerCheckpointBarrier(long j, long j2, CheckpointOptions checkpointOptions) {
        TaskInvokable taskInvokable = this.invokable;
        CheckpointMetaData checkpointMetaData = new CheckpointMetaData(j, j2, System.currentTimeMillis());
        if (this.executionState != ExecutionState.RUNNING) {
            LOG.debug("Declining checkpoint request for non-running task {} ({}).", this.taskNameWithSubtask, this.executionId);
            declineCheckpoint(j, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
            return;
        }
        Preconditions.checkState(taskInvokable instanceof CheckpointableTask, "invokable is not checkpointable");
        try {
            ((CheckpointableTask) taskInvokable).triggerCheckpointAsync(checkpointMetaData, checkpointOptions).handle((bool, th) -> {
                if (th == null && bool.booleanValue()) {
                    return true;
                }
                declineCheckpoint(j, CheckpointFailureReason.TASK_FAILURE, th);
                return false;
            });
        } catch (RejectedExecutionException e) {
            LOG.debug("Triggering checkpoint {} for {} ({}) was rejected by the mailbox", new Object[]{Long.valueOf(j), this.taskNameWithSubtask, this.executionId});
            declineCheckpoint(j, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
        } catch (Throwable th2) {
            if (getExecutionState() == ExecutionState.RUNNING) {
                failExternally(new Exception("Error while triggering checkpoint " + j + " for " + this.taskNameWithSubtask, th2));
            } else {
                LOG.debug("Encountered error while triggering checkpoint {} for {} ({}) while being not in state running.", new Object[]{Long.valueOf(j), this.taskNameWithSubtask, this.executionId, th2});
            }
        }
    }

    private void declineCheckpoint(long j, CheckpointFailureReason checkpointFailureReason) {
        declineCheckpoint(j, checkpointFailureReason, null);
    }

    private void declineCheckpoint(long j, CheckpointFailureReason checkpointFailureReason, @Nullable Throwable th) {
        this.checkpointResponder.declineCheckpoint(this.jobId, this.executionId, j, new CheckpointException("Task name with subtask : " + this.taskNameWithSubtask, checkpointFailureReason, th));
    }

    public void notifyCheckpointComplete(long j) {
        notifyCheckpoint(j, -1L, NotifyCheckpointOperation.COMPLETE);
    }

    public void notifyCheckpointAborted(long j, long j2) {
        notifyCheckpoint(j, j2, NotifyCheckpointOperation.ABORT);
    }

    public void notifyCheckpointSubsumed(long j) {
        notifyCheckpoint(j, -1L, NotifyCheckpointOperation.SUBSUME);
    }

    private void notifyCheckpoint(long j, long j2, NotifyCheckpointOperation notifyCheckpointOperation) {
        TaskInvokable taskInvokable = this.invokable;
        if (this.executionState != ExecutionState.RUNNING || taskInvokable == null) {
            LOG.info("Ignoring checkpoint {} notification for non-running task {}.", notifyCheckpointOperation, this.taskNameWithSubtask);
            return;
        }
        Preconditions.checkState(taskInvokable instanceof CheckpointableTask, "invokable is not checkpointable");
        try {
            switch (notifyCheckpointOperation) {
                case ABORT:
                    ((CheckpointableTask) taskInvokable).notifyCheckpointAbortAsync(j, j2);
                    break;
                case COMPLETE:
                    ((CheckpointableTask) taskInvokable).notifyCheckpointCompleteAsync(j);
                    break;
                case SUBSUME:
                    ((CheckpointableTask) taskInvokable).notifyCheckpointSubsumedAsync(j);
                    break;
            }
        } catch (RejectedExecutionException e) {
            LOG.debug("Notify checkpoint {}} {} for {} ({}) was rejected by the mailbox.", new Object[]{notifyCheckpointOperation, Long.valueOf(j), this.taskNameWithSubtask, this.executionId});
        } catch (Throwable th) {
            switch (notifyCheckpointOperation) {
                case ABORT:
                case COMPLETE:
                    if (getExecutionState() == ExecutionState.RUNNING) {
                        failExternally(new RuntimeException(String.format("Error while notify checkpoint %s.", notifyCheckpointOperation), th));
                        return;
                    }
                    return;
                case SUBSUME:
                    ExceptionUtils.rethrow(th);
                    return;
                default:
                    return;
            }
        }
    }

    public void deliverOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException {
        FlinkException flinkException;
        TaskInvokable taskInvokable = this.invokable;
        ExecutionState executionState = this.executionState;
        if (taskInvokable == null || !(executionState == ExecutionState.RUNNING || executionState == ExecutionState.INITIALIZING)) {
            throw new TaskNotRunningException("Task is not running, but in state " + executionState);
        }
        if (taskInvokable instanceof CoordinatedTask) {
            try {
                ((CoordinatedTask) taskInvokable).dispatchOperatorEvent(operatorID, serializedValue);
            } finally {
            }
        }
    }

    private void cancelInvokable(TaskInvokable taskInvokable) {
        if (taskInvokable == null || !this.invokableHasBeenCanceled.compareAndSet(false, true)) {
            return;
        }
        try {
            taskInvokable.cancel();
        } catch (Throwable th) {
            LOG.error("Error while canceling task {}.", this.taskNameWithSubtask, th);
        }
    }

    public String toString() {
        return String.format("%s (%s) [%s]", this.taskNameWithSubtask, this.executionId, this.executionState);
    }

    private static TaskInvokable loadAndInstantiateInvokable(ClassLoader classLoader, String str, Environment environment) throws Throwable {
        try {
            try {
                try {
                    return (TaskInvokable) Class.forName(str, true, classLoader).asSubclass(TaskInvokable.class).getConstructor(Environment.class).newInstance(environment);
                } catch (InvocationTargetException e) {
                    throw e.getTargetException();
                } catch (Exception e2) {
                    throw new FlinkException("Could not instantiate the task's invokable class.", e2);
                }
            } catch (NoSuchMethodException e3) {
                throw new FlinkException("Task misses proper constructor", e3);
            }
        } catch (Throwable th) {
            throw new Exception("Could not load the task's invokable class.", th);
        }
    }

    public static void logTaskThreadStackTrace(Thread thread, String str, long j, String str2) {
        StackTraceElement[] stackTrace = thread.getStackTrace();
        StringBuilder sb = new StringBuilder();
        for (StackTraceElement stackTraceElement : stackTrace) {
            sb.append(stackTraceElement).append('\n');
        }
        LOG.warn("Task '{}' did not react to cancelling signal - {}; it is stuck for {} seconds in method:\n {}", new Object[]{str, str2, Long.valueOf(j / 1000), sb});
    }
}
