package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.Serializable;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystemSafetyNet;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private static final Runnable POISON_LETTER = () -> {
    };
    private static final Runnable DEFAULT_ACTION_AVAILABLE = () -> {
    };
    private final Object lock;

    @Nullable
    protected StreamInputProcessor inputProcessor;
    protected OP headOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    protected final StreamConfig configuration;
    protected StateBackend stateBackend;
    private CheckpointStorageWorkerView checkpointStorage;
    protected ProcessingTimeService timerService;
    private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler;
    private final Map<String, Accumulator<?, ?>> accumulatorMap;
    private final CloseableRegistry cancelables;
    private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private ExecutorService asyncOperationsThreadPool;
    private CheckpointExceptionHandler checkpointExceptionHandler;
    private final List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> recordWriters;
    private final SynchronousSavepointLatch syncSavepointLatch;
    protected final Mailbox mailbox;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$ActionContext.class */
    public final class ActionContext {
        private final Runnable actionUnavailableLetter;

        public ActionContext() {
            Mailbox mailbox = StreamTask.this.mailbox;
            mailbox.getClass();
            this.actionUnavailableLetter = ThrowingRunnable.unchecked(mailbox::waitUntilHasMail);
        }

        public void allActionsCompleted() {
            StreamTask.this.mailbox.clearAndPut(StreamTask.POISON_LETTER);
        }

        public void actionsAvailable() throws InterruptedException {
            StreamTask.this.mailbox.putMail(StreamTask.DEFAULT_ACTION_AVAILABLE);
        }

        public void actionsUnavailable() throws InterruptedException {
            StreamTask.this.mailbox.putMail(this.actionUnavailableLetter);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$AsyncCheckpointRunnable.class */
    public static final class AsyncCheckpointRunnable implements Runnable, Closeable {
        private final StreamTask<?, ?> owner;
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointMetrics checkpointMetrics;
        private final long asyncStartNanos;
        private final AtomicReference<CheckpointingOperation.AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(CheckpointingOperation.AsyncCheckpointState.RUNNING);

        AsyncCheckpointRunnable(StreamTask<?, ?> streamTask, Map<OperatorID, OperatorSnapshotFutures> map, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, long j) {
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.operatorSnapshotsInProgress = (Map) Preconditions.checkNotNull(map);
            this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointMetrics = (CheckpointMetrics) Preconditions.checkNotNull(checkpointMetrics);
            this.asyncStartNanos = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            FileSystemSafetyNet.initializeSafetyNetForThread();
            try {
                try {
                    TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                    TaskStateSnapshot taskStateSnapshot2 = new TaskStateSnapshot(this.operatorSnapshotsInProgress.size());
                    for (Map.Entry<OperatorID, OperatorSnapshotFutures> entry : this.operatorSnapshotsInProgress.entrySet()) {
                        OperatorID key = entry.getKey();
                        OperatorSnapshotFinalizer operatorSnapshotFinalizer = new OperatorSnapshotFinalizer(entry.getValue());
                        taskStateSnapshot.putSubtaskStateByOperatorID(key, operatorSnapshotFinalizer.getJobManagerOwnedState());
                        taskStateSnapshot2.putSubtaskStateByOperatorID(key, operatorSnapshotFinalizer.getTaskLocalState());
                    }
                    long nanoTime = (System.nanoTime() - this.asyncStartNanos) / 1000000;
                    this.checkpointMetrics.setAsyncDurationMillis(nanoTime);
                    if (this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.COMPLETED)) {
                        reportCompletedSnapshotStates(taskStateSnapshot, taskStateSnapshot2, nanoTime);
                    } else {
                        StreamTask.LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.", this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()));
                    }
                    ((StreamTask) this.owner).cancelables.unregisterCloseable(this);
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                } catch (Exception e) {
                    handleExecutionException(e);
                    ((StreamTask) this.owner).cancelables.unregisterCloseable(this);
                    FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                }
            } catch (Throwable th) {
                ((StreamTask) this.owner).cancelables.unregisterCloseable(this);
                FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
                throw th;
            }
        }

        private void reportCompletedSnapshotStates(TaskStateSnapshot taskStateSnapshot, TaskStateSnapshot taskStateSnapshot2, long j) {
            TaskStateManager taskStateManager = this.owner.getEnvironment().getTaskStateManager();
            boolean hasState = taskStateSnapshot.hasState();
            boolean hasState2 = taskStateSnapshot2.hasState();
            Preconditions.checkState(hasState || !hasState2, "Found cached state but no corresponding primary state is reported to the job manager. This indicates a problem.");
            taskStateManager.reportTaskStateSnapshots(this.checkpointMetaData, this.checkpointMetrics, hasState ? taskStateSnapshot : null, hasState2 ? taskStateSnapshot2 : null);
            StreamTask.LOG.debug("{} - finished asynchronous part of checkpoint {}. Asynchronous duration: {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(j)});
            StreamTask.LOG.trace("{} - reported the following states in snapshot for checkpoint {}: {}.", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), taskStateSnapshot});
        }

        private void handleExecutionException(Exception exc) {
            boolean z = false;
            CheckpointingOperation.AsyncCheckpointState asyncCheckpointState = this.asyncCheckpointState.get();
            while (true) {
                CheckpointingOperation.AsyncCheckpointState asyncCheckpointState2 = asyncCheckpointState;
                if (CheckpointingOperation.AsyncCheckpointState.DISCARDED == asyncCheckpointState2) {
                    break;
                }
                if (this.asyncCheckpointState.compareAndSet(asyncCheckpointState2, CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
                    z = true;
                    try {
                        cleanup();
                    } catch (Exception e) {
                        exc.addSuppressed(e);
                    }
                    try {
                        ((StreamTask) this.owner).checkpointExceptionHandler.tryHandleCheckpointException(this.checkpointMetaData, new Exception("Could not materialize checkpoint " + this.checkpointMetaData.getCheckpointId() + " for operator " + this.owner.getName() + '.', exc));
                    } catch (Exception e2) {
                        this.owner.handleAsyncException("Failure in asynchronous checkpoint materialization", new AsynchronousException(e2));
                    }
                    asyncCheckpointState = CheckpointingOperation.AsyncCheckpointState.DISCARDED;
                } else {
                    asyncCheckpointState = this.asyncCheckpointState.get();
                }
            }
            if (z) {
                return;
            }
            StreamTask.LOG.trace("Caught followup exception from a failed checkpoint thread. This can be ignored.", exc);
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (!this.asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING, CheckpointingOperation.AsyncCheckpointState.DISCARDED)) {
                logFailedCleanupAttempt();
                return;
            }
            try {
                cleanup();
            } catch (Exception e) {
                StreamTask.LOG.warn("Could not properly clean up the async checkpoint runnable.", e);
            }
        }

        private void cleanup() throws Exception {
            StreamTask.LOG.debug("Cleanup AsyncCheckpointRunnable for checkpoint {} of {}.", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.owner.getName());
            Exception exc = null;
            for (OperatorSnapshotFutures operatorSnapshotFutures : this.operatorSnapshotsInProgress.values()) {
                if (operatorSnapshotFutures != null) {
                    try {
                        operatorSnapshotFutures.cancel();
                    } catch (Exception e) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
                    }
                }
            }
            if (null != exc) {
                throw exc;
            }
        }

        private void logFailedCleanupAttempt() {
            StreamTask.LOG.debug("{} - asynchronous checkpointing operation for checkpoint {} has already been completed. Thus, the state handles are not cleaned up.", this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CheckpointingOperation.class */
    public static final class CheckpointingOperation {
        private final StreamTask<?, ?> owner;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointOptions checkpointOptions;
        private final CheckpointMetrics checkpointMetrics;
        private final CheckpointStreamFactory storageLocation;
        private final StreamOperator<?>[] allOperators;
        private long startSyncPartNano;
        private long startAsyncPartNano;
        private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CheckpointingOperation$AsyncCheckpointState.class */
        public enum AsyncCheckpointState {
            RUNNING,
            DISCARDED,
            COMPLETED
        }

        public CheckpointingOperation(StreamTask<?, ?> streamTask, CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory, CheckpointMetrics checkpointMetrics) {
            this.owner = (StreamTask) Preconditions.checkNotNull(streamTask);
            this.checkpointMetaData = (CheckpointMetaData) Preconditions.checkNotNull(checkpointMetaData);
            this.checkpointOptions = (CheckpointOptions) Preconditions.checkNotNull(checkpointOptions);
            this.checkpointMetrics = (CheckpointMetrics) Preconditions.checkNotNull(checkpointMetrics);
            this.storageLocation = (CheckpointStreamFactory) Preconditions.checkNotNull(checkpointStreamFactory);
            this.allOperators = streamTask.operatorChain.getAllOperators();
            this.operatorSnapshotsInProgress = new HashMap(this.allOperators.length);
        }

        public void executeCheckpointing() throws Exception {
            this.startSyncPartNano = System.nanoTime();
            try {
                for (StreamOperator<?> streamOperator : this.allOperators) {
                    checkpointStreamOperator(streamOperator);
                }
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", Long.valueOf(this.checkpointMetaData.getCheckpointId()), this.owner.getName());
                }
                this.startAsyncPartNano = System.nanoTime();
                this.checkpointMetrics.setSyncDurationMillis((this.startAsyncPartNano - this.startSyncPartNano) / 1000000);
                AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(this.owner, this.operatorSnapshotsInProgress, this.checkpointMetaData, this.checkpointMetrics, this.startAsyncPartNano);
                ((StreamTask) this.owner).cancelables.registerCloseable(asyncCheckpointRunnable);
                ((StreamTask) this.owner).asyncOperationsThreadPool.execute(asyncCheckpointRunnable);
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("{} - finished synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                }
            } catch (Exception e) {
                for (OperatorSnapshotFutures operatorSnapshotFutures : this.operatorSnapshotsInProgress.values()) {
                    if (null != operatorSnapshotFutures) {
                        try {
                            operatorSnapshotFutures.cancel();
                        } catch (Exception e2) {
                            StreamTask.LOG.warn("Could not properly cancel an operator snapshot result.", e2);
                        }
                    }
                }
                if (StreamTask.LOG.isDebugEnabled()) {
                    StreamTask.LOG.debug("{} - did NOT finish synchronous part of checkpoint {}. Alignment duration: {} ms, snapshot duration {} ms", new Object[]{this.owner.getName(), Long.valueOf(this.checkpointMetaData.getCheckpointId()), Long.valueOf(this.checkpointMetrics.getAlignmentDurationNanos() / 1000000), Long.valueOf(this.checkpointMetrics.getSyncDurationMillis())});
                }
                if (this.checkpointOptions.getCheckpointType().isSynchronous()) {
                    throw e;
                }
                ((StreamTask) this.owner).checkpointExceptionHandler.tryHandleCheckpointException(this.checkpointMetaData, e);
            }
        }

        private void checkpointStreamOperator(StreamOperator<?> streamOperator) throws Exception {
            if (null != streamOperator) {
                this.operatorSnapshotsInProgress.put(streamOperator.getOperatorID(), streamOperator.snapshotState(this.checkpointMetaData.getCheckpointId(), this.checkpointMetaData.getTimestamp(), this.checkpointOptions, this.storageLocation));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$StreamTaskAsyncExceptionHandler.class */
    public static class StreamTaskAsyncExceptionHandler {
        private final Environment environment;

        StreamTaskAsyncExceptionHandler(Environment environment) {
            this.environment = environment;
        }

        void handleAsyncException(String str, Throwable th) {
            this.environment.failExternally(new AsynchronousException(str, th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment) {
        this(environment, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment, @Nullable ProcessingTimeService processingTimeService) {
        this(environment, processingTimeService, FatalExitExceptionHandler.INSTANCE);
    }

    protected StreamTask(Environment environment, @Nullable ProcessingTimeService processingTimeService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        super(environment);
        this.lock = new Object();
        this.cancelables = new CloseableRegistry();
        this.timerService = processingTimeService;
        this.uncaughtExceptionHandler = (Thread.UncaughtExceptionHandler) Preconditions.checkNotNull(uncaughtExceptionHandler);
        this.configuration = new StreamConfig(getTaskConfiguration());
        this.accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
        this.recordWriters = createRecordWriters(this.configuration, environment);
        this.syncSavepointLatch = new SynchronousSavepointLatch();
        this.mailbox = new MailboxImpl();
        this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
    }

    protected abstract void init() throws Exception;

    protected void cancelTask() throws Exception {
    }

    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.close();
        }
    }

    protected void processInput(StreamTask<OUT, OP>.ActionContext actionContext) throws Exception {
        if (this.inputProcessor.processInput()) {
            return;
        }
        actionContext.allActionsCompleted();
    }

    private void runAndHandleCancel() throws Exception {
        try {
            run();
        } catch (InterruptedException e) {
            if (this.canceled) {
                return;
            }
            Thread.currentThread().interrupt();
            throw e;
        } catch (Exception e2) {
            if (!this.canceled) {
                throw e2;
            }
            LOG.warn("Error while canceling task.", e2);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:12:0x003f, code lost:
    
        continue;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void run() throws java.lang.Exception {
        /*
            r4 = this;
            org.apache.flink.streaming.runtime.tasks.StreamTask$ActionContext r0 = new org.apache.flink.streaming.runtime.tasks.StreamTask$ActionContext
            r1 = r0
            r2 = r4
            r1.<init>()
            r5 = r0
        L9:
            r0 = r4
            org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox r0 = r0.mailbox
            boolean r0 = r0.hasMail()
            if (r0 == 0) goto L3f
        L15:
            r0 = r4
            org.apache.flink.streaming.runtime.tasks.mailbox.Mailbox r0 = r0.mailbox
            java.util.Optional r0 = r0.tryTakeMail()
            r1 = r0
            r6 = r1
            boolean r0 = r0.isPresent()
            if (r0 == 0) goto L3f
            r0 = r6
            java.lang.Object r0 = r0.get()
            java.lang.Runnable r0 = (java.lang.Runnable) r0
            r7 = r0
            r0 = r7
            java.lang.Runnable r1 = org.apache.flink.streaming.runtime.tasks.StreamTask.POISON_LETTER
            if (r0 != r1) goto L36
            return
        L36:
            r0 = r7
            r0.run()
            goto L15
        L3f:
            r0 = r4
            r1 = r5
            r0.processInput(r1)
            goto L9
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.runtime.tasks.StreamTask.run():void");
    }

    protected void advanceToEndOfEventTime() throws Exception {
    }

    protected void finishTask() throws Exception {
    }

    public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        return new StreamTaskStateInitializerImpl(getEnvironment(), this.stateBackend, this.timerService);
    }

    @VisibleForTesting
    SynchronousSavepointLatch getSynchronousSavepointLatch() {
        return this.syncSavepointLatch;
    }

    public final void invoke() throws Exception {
        try {
            LOG.debug("Initializing {}.", getName());
            this.asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", this.uncaughtExceptionHandler));
            this.checkpointExceptionHandler = createCheckpointExceptionHandlerFactory().createCheckpointExceptionHandler(getEnvironment());
            this.stateBackend = createStateBackend();
            this.checkpointStorage = this.stateBackend.createCheckpointStorage(getEnvironment().getJobID());
            if (this.timerService == null) {
                this.timerService = new SystemProcessingTimeService(this, getCheckpointLock(), new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName(), getUserCodeClassLoader()));
            }
            this.operatorChain = new OperatorChain<>(this, this.recordWriters);
            this.headOperator = this.operatorChain.getHeadOperator();
            init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", getName());
            synchronized (this.lock) {
                initializeState();
                openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            runAndHandleCancel();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Finished task {}", getName());
            synchronized (this.lock) {
                closeAllOperators();
                this.timerService.quiesce();
                this.isRunning = false;
            }
            this.timerService.awaitPendingAfterQuiesce();
            LOG.debug("Closed operators for task {}", getName());
            this.operatorChain.flushOutputs();
            tryDisposeAllOperators();
            this.isRunning = false;
            setShouldInterruptOnCancel(false);
            Thread.interrupted();
            tryShutdownTimerService();
            try {
                this.cancelables.close();
                shutdownAsyncThreads();
            } catch (Throwable th) {
                LOG.error("Could not shut down async checkpoint threads", th);
            }
            try {
                cleanup();
            } catch (Throwable th2) {
                LOG.error("Error during cleanup of stream task", th2);
            }
            if (1 == 0) {
                disposeAllOperators();
            }
            if (this.operatorChain != null) {
                synchronized (this.lock) {
                    this.operatorChain.releaseOutputs();
                }
            }
        } catch (Throwable th3) {
            this.isRunning = false;
            setShouldInterruptOnCancel(false);
            Thread.interrupted();
            tryShutdownTimerService();
            try {
                this.cancelables.close();
                shutdownAsyncThreads();
            } catch (Throwable th4) {
                LOG.error("Could not shut down async checkpoint threads", th4);
            }
            try {
                cleanup();
            } catch (Throwable th5) {
                LOG.error("Error during cleanup of stream task", th5);
            }
            if (0 == 0) {
                disposeAllOperators();
            }
            if (this.operatorChain != null) {
                synchronized (this.lock) {
                    this.operatorChain.releaseOutputs();
                }
            }
            throw th3;
        }
    }

    public final void cancel() throws Exception {
        this.mailbox.clearAndPut(POISON_LETTER);
        this.isRunning = false;
        this.canceled = true;
        try {
            this.syncSavepointLatch.cancelCheckpointLatch();
            cancelTask();
        } finally {
            this.cancelables.close();
        }
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (streamOperator != null) {
                streamOperator.open();
            }
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int length = allOperators.length - 1; length >= 0; length--) {
            StreamOperator<?> streamOperator = allOperators[length];
            if (streamOperator != null) {
                streamOperator.close();
            }
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (streamOperator != null) {
                streamOperator.dispose();
            }
        }
    }

    private void shutdownAsyncThreads() throws Exception {
        if (this.asyncOperationsThreadPool.isShutdown()) {
            return;
        }
        this.asyncOperationsThreadPool.shutdownNow();
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
                if (streamOperator != null) {
                    try {
                        streamOperator.dispose();
                    } catch (Throwable th) {
                        LOG.error("Error during disposal of stream operator.", th);
                    }
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null && !this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.configuration.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getTaskNameWithSubtaskAndId() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks() + " (" + getEnvironment().getExecutionId() + ')';
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.checkpointStorage;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean z) throws Exception {
        try {
            return performCheckpoint(checkpointMetaData, checkpointOptions, new CheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L), z);
        } catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), getName(), e});
            return false;
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        try {
            if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, false) && this.syncSavepointLatch.isSet()) {
                this.syncSavepointLatch.blockUntilCheckpointIsAcknowledged();
            }
        } catch (Exception e) {
            throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
        } catch (CancelTaskException e2) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), Long.valueOf(checkpointMetaData.getCheckpointId()));
            throw e2;
        }
    }

    public void abortCheckpointOnBarrier(long j, Throwable th) throws Exception {
        LOG.debug("Aborting checkpoint via cancel-barrier {} for task {}", Long.valueOf(j), getName());
        getEnvironment().declineCheckpoint(j, th);
        synchronized (this.lock) {
            this.operatorChain.broadcastCheckpointCancelMarker(j);
        }
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics, boolean z) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), checkpointOptions.getCheckpointType(), getName()});
        long checkpointId = checkpointMetaData.getCheckpointId();
        synchronized (this.lock) {
            if (this.isRunning) {
                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    this.syncSavepointLatch.setCheckpointId(checkpointId);
                    if (z) {
                        advanceToEndOfEventTime();
                    }
                }
                this.operatorChain.prepareSnapshotPreBarrier(checkpointId);
                this.operatorChain.broadcastCheckpointBarrier(checkpointId, checkpointMetaData.getTimestamp(), checkpointOptions);
                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);
                return true;
            }
            CancelCheckpointMarker cancelCheckpointMarker = new CancelCheckpointMarker(checkpointMetaData.getCheckpointId());
            Exception exc = null;
            Iterator<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> it = this.recordWriters.iterator();
            while (it.hasNext()) {
                try {
                    it.next().broadcastEvent(cancelCheckpointMarker);
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(new Exception("Could not send cancel checkpoint marker to downstream tasks.", e), exc);
                }
            }
            if (exc != null) {
                throw exc;
            }
            return false;
        }
    }

    public ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        boolean z = false;
        synchronized (this.lock) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", getName());
                for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
                    if (streamOperator != null) {
                        streamOperator.notifyCheckpointComplete(j);
                    }
                }
                z = true;
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", getName());
            }
        }
        if (z) {
            this.syncSavepointLatch.acknowledgeCheckpointAndTrigger(j, this::finishTask);
        }
    }

    private void tryShutdownTimerService() {
        if (this.timerService == null || this.timerService.isTerminated()) {
            return;
        }
        try {
            long j = getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
            if (!this.timerService.shutdownServiceUninterruptible(j)) {
                LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", Long.valueOf(j));
            }
        } catch (Throwable th) {
            LOG.error("Could not shut down timer service", th);
        }
    }

    private void checkpointState(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        new CheckpointingOperation(this, checkpointMetaData, checkpointOptions, this.checkpointStorage.resolveCheckpointStorageLocation(checkpointMetaData.getCheckpointId(), checkpointOptions.getTargetLocation()), checkpointMetrics).executeCheckpointing();
    }

    private void initializeState() throws Exception {
        for (StreamOperator<?> streamOperator : this.operatorChain.getAllOperators()) {
            if (null != streamOperator) {
                streamOperator.initializeState();
            }
        }
    }

    private StateBackend createStateBackend() throws Exception {
        return StateBackendLoader.fromApplicationOrConfigOrDefault(this.configuration.getStateBackend(getUserCodeClassLoader()), getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
    }

    protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
        return new CheckpointExceptionHandlerFactory();
    }

    public ProcessingTimeService getProcessingTimeService() {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler
    public void handleAsyncException(String str, Throwable th) {
        if (this.isRunning) {
            this.asyncExceptionHandler.handleAsyncException(str, th);
        }
    }

    public String toString() {
        return getName();
    }

    public CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    @VisibleForTesting
    public static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig streamConfig, Environment environment) {
        ArrayList arrayList = new ArrayList();
        List<StreamEdge> outEdgesInOrder = streamConfig.getOutEdgesInOrder(environment.getUserClassLoader());
        Map<Integer, StreamConfig> transitiveChainedTaskConfigsWithSelf = streamConfig.getTransitiveChainedTaskConfigsWithSelf(environment.getUserClassLoader());
        for (int i = 0; i < outEdgesInOrder.size(); i++) {
            StreamEdge streamEdge = outEdgesInOrder.get(i);
            arrayList.add(createRecordWriter(streamEdge, i, environment, environment.getTaskInfo().getTaskName(), transitiveChainedTaskConfigsWithSelf.get(Integer.valueOf(streamEdge.getSourceId())).getBufferTimeout()));
        }
        return arrayList;
    }

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(StreamEdge streamEdge, int i, Environment environment, String str, long j) {
        int numTargetKeyGroups;
        Serializable partitioner = streamEdge.getPartitioner();
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{partitioner, Integer.valueOf(i), str});
        ResultPartitionWriter writer = environment.getWriter(i);
        if ((partitioner instanceof ConfigurableStreamPartitioner) && 0 < (numTargetKeyGroups = writer.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner) partitioner).configure(numTargetKeyGroups);
        }
        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> build = new RecordWriterBuilder().setChannelSelector(partitioner).setTimeout(j).setTaskName(str).build(writer);
        build.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return build;
    }
}
