package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.Thread;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.configuration.StateChangelogOptionsInternal;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.AutoCloseableRegistry;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.security.FlinkSecurityManager;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.AvailabilityProvider;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.ChannelStateHolder;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.jobgraph.tasks.CoordinatedTask;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.runtime.taskmanager.AsynchronousException;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.graph.NonChainedOutput;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.GaugePeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxMetricsController;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.PeriodTimer;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final StreamTaskActionExecutor actionExecutor;

    @Nullable
    protected StreamInputProcessor inputProcessor;
    protected OP mainOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    protected final StreamConfig configuration;
    protected final StateBackend stateBackend;
    protected final CheckpointStorage checkpointStorage;
    private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;
    protected final TimerService timerService;
    protected final TimerService systemTimerService;
    private final CloseableRegistry cancelables;
    private final AutoCloseableRegistry resourceCloser;
    private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
    private volatile boolean isRunning;
    private volatile boolean isRestoring;
    private volatile boolean canceled;
    private volatile boolean failing;
    private boolean finishedOperators;
    private boolean closedOperators;
    private final ExecutorService asyncOperationsThreadPool;
    protected final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
    protected final MailboxProcessor mailboxProcessor;
    final MailboxExecutor mainMailboxExecutor;
    private final ExecutorService channelIOExecutor;
    private Long syncSavepoint;
    private Long finalCheckpointMinId;
    private final CompletableFuture<Void> finalCheckpointCompleted;
    private long latestReportCheckpointId;
    private long latestAsyncCheckpointStartDelayNanos;
    private volatile boolean endOfDataReceived;
    private final long bufferDebloatPeriod;
    private final Environment environment;
    private final Object shouldInterruptOnCancelLock;

    @GuardedBy("shouldInterruptOnCancelLock")
    private boolean shouldInterruptOnCancel;

    @Nullable
    private final AvailabilityProvider changelogWriterAvailabilityProvider;

    /* renamed from: org.apache.flink.streaming.runtime.tasks.StreamTask$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus = new int[DataInputStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[DataInputStatus.MORE_AVAILABLE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[DataInputStatus.NOTHING_AVAILABLE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[DataInputStatus.END_OF_RECOVERY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[DataInputStatus.STOPPED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[DataInputStatus.END_OF_DATA.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[DataInputStatus.END_OF_INPUT.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$CanEmitBatchOfRecordsChecker.class */
    public interface CanEmitBatchOfRecordsChecker {
        boolean check();
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$ResumeWrapper.class */
    private static class ResumeWrapper implements Runnable {
        private final MailboxDefaultAction.Suspension suspendedDefaultAction;

        @Nullable
        private final PeriodTimer timer;

        public ResumeWrapper(MailboxDefaultAction.Suspension suspension, @Nullable PeriodTimer periodTimer) {
            this.suspendedDefaultAction = suspension;
            if (periodTimer != null) {
                periodTimer.markStart();
            }
            this.timer = periodTimer;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.timer != null) {
                this.timer.markEnd();
            }
            this.suspendedDefaultAction.resume();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$StreamTaskAsyncExceptionHandler.class */
    public static class StreamTaskAsyncExceptionHandler implements AsyncExceptionHandler {
        private final Environment environment;

        StreamTaskAsyncExceptionHandler(Environment environment) {
            this.environment = environment;
        }

        public void handleAsyncException(String str, Throwable th) {
            this.environment.failExternally(new AsynchronousException(str, th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment) throws Exception {
        this(environment, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment, @Nullable TimerService timerService) throws Exception {
        this(environment, timerService, FatalExitExceptionHandler.INSTANCE);
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, StreamTaskActionExecutor.IMMEDIATE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor streamTaskActionExecutor) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, streamTaskActionExecutor, new TaskMailboxImpl(Thread.currentThread()));
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor streamTaskActionExecutor, TaskMailbox taskMailbox) throws Exception {
        this.cancelables = new CloseableRegistry();
        this.syncSavepoint = null;
        this.finalCheckpointMinId = null;
        this.finalCheckpointCompleted = new CompletableFuture<>();
        this.latestReportCheckpointId = -1L;
        this.endOfDataReceived = false;
        this.shouldInterruptOnCancelLock = new Object();
        this.shouldInterruptOnCancel = true;
        this.resourceCloser = new AutoCloseableRegistry();
        try {
            this.environment = environment;
            this.configuration = new StreamConfig(environment.getTaskConfiguration());
            MailboxMetricsController mailboxMetricsController = new MailboxMetricsController(environment.getMetricGroup().getIOMetricGroup().getMailboxLatency(), environment.getMetricGroup().getIOMetricGroup().getNumMailsProcessedCounter());
            environment.getMetricGroup().getIOMetricGroup().registerMailboxSizeSupplier(() -> {
                return Integer.valueOf(taskMailbox.size());
            });
            this.mailboxProcessor = new MailboxProcessor(this::processInput, taskMailbox, streamTaskActionExecutor, mailboxMetricsController);
            this.resourceCloser.registerCloseable(this.mailboxProcessor);
            this.channelIOExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("channel-state-unspilling"));
            AutoCloseableRegistry autoCloseableRegistry = this.resourceCloser;
            ExecutorService executorService = this.channelIOExecutor;
            executorService.getClass();
            autoCloseableRegistry.registerCloseable(executorService::shutdown);
            this.recordWriter = createRecordWriterDelegate(this.configuration, environment);
            this.resourceCloser.registerCloseable(this::releaseOutputResources);
            this.resourceCloser.registerCloseable(this::closeAllOperators);
            this.resourceCloser.registerCloseable(this::cleanUpInternal);
            this.actionExecutor = (StreamTaskActionExecutor) Preconditions.checkNotNull(streamTaskActionExecutor);
            this.mainMailboxExecutor = this.mailboxProcessor.getMainMailboxExecutor();
            this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
            this.asyncOperationsThreadPool = new ThreadPoolExecutor(0, this.configuration.getMaxConcurrentCheckpoints() + 1, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
            this.resourceCloser.registerCloseable(this::shutdownAsyncThreads);
            this.resourceCloser.registerCloseable(this.cancelables);
            environment.setMainMailboxExecutor(this.mainMailboxExecutor);
            environment.setAsyncOperationsThreadPool(this.asyncOperationsThreadPool);
            this.stateBackend = createStateBackend();
            this.checkpointStorage = createCheckpointStorage(this.stateBackend);
            this.changelogWriterAvailabilityProvider = environment.getTaskStateManager().getStateChangelogStorage() == null ? null : environment.getTaskStateManager().getStateChangelogStorage().getAvailabilityProvider();
            CheckpointStorageAccess createCheckpointStorage = this.checkpointStorage.createCheckpointStorage(getEnvironment().getJobID());
            environment.setCheckpointStorageAccess(createCheckpointStorage);
            if (timerService == null) {
                this.timerService = createTimerService("Time Trigger for " + getName());
            } else {
                this.timerService = timerService;
            }
            this.systemTimerService = createTimerService("System Time Trigger for " + getName());
            this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl(this.checkpointStorage, createCheckpointStorage, getName(), streamTaskActionExecutor, getAsyncOperationsThreadPool(), environment, this, this.configuration.isUnalignedCheckpointsEnabled(), ((Boolean) this.configuration.getConfiguration().get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)).booleanValue(), (v1, v2) -> {
                return prepareInputSnapshot(v1, v2);
            }, this.configuration.getMaxConcurrentCheckpoints(), BarrierAlignmentUtil.createRegisterTimerCallback(this.mainMailboxExecutor, this.systemTimerService), this.configuration.getMaxSubtasksPerChannelStateFile());
            AutoCloseableRegistry autoCloseableRegistry2 = this.resourceCloser;
            SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = this.subtaskCheckpointCoordinator;
            subtaskCheckpointCoordinator.getClass();
            autoCloseableRegistry2.registerCloseable(subtaskCheckpointCoordinator::close);
            this.resourceCloser.registerCloseable(this::tryShutdownTimerService);
            injectChannelStateWriterIntoChannels();
            environment.getMetricGroup().getIOMetricGroup().setEnableBusyTime(true);
            this.bufferDebloatPeriod = ((Duration) environment.getTaskManagerInfo().getConfiguration().get(TaskManagerOptions.BUFFER_DEBLOAT_PERIOD)).toMillis();
            mailboxMetricsController.setupLatencyMeasurement(this.systemTimerService, this.mainMailboxExecutor);
        } catch (Exception e) {
            try {
                this.resourceCloser.close();
            } catch (Throwable th) {
                e.addSuppressed(th);
            }
            throw e;
        }
    }

    private TimerService createTimerService(String str) {
        return new SystemProcessingTimeService(this::handleTimerException, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, str));
    }

    private void injectChannelStateWriterIntoChannels() {
        Environment environment = getEnvironment();
        ChannelStateWriter channelStateWriter = this.subtaskCheckpointCoordinator.getChannelStateWriter();
        for (InputGate inputGate : environment.getAllInputGates()) {
            inputGate.setChannelStateWriter(channelStateWriter);
        }
        for (ChannelStateHolder channelStateHolder : environment.getAllWriters()) {
            if (channelStateHolder instanceof ChannelStateHolder) {
                channelStateHolder.setChannelStateWriter(channelStateWriter);
            }
        }
    }

    private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long j) throws CheckpointException {
        return this.inputProcessor == null ? FutureUtils.completedVoidFuture() : this.inputProcessor.prepareSnapshot(channelStateWriter, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCheckpointCoordinator getCheckpointCoordinator() {
        return this.subtaskCheckpointCoordinator;
    }

    protected abstract void init() throws Exception;

    protected void cancelTask() throws Exception {
    }

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        GaugePeriodTimer gaugePeriodTimer;
        CompletableFuture availableFuture;
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$runtime$io$DataInputStatus[this.inputProcessor.processInput().ordinal()]) {
            case 1:
                if (taskIsAvailable()) {
                    return;
                }
                break;
            case 3:
                throw new IllegalStateException("We should not receive this event here.");
            case 4:
                endData(StopMode.NO_DRAIN);
                return;
            case 5:
                endData(StopMode.DRAIN);
                notifyEndOfData();
                return;
            case 6:
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
                return;
        }
        TaskIOMetricGroup iOMetricGroup = getEnvironment().getMetricGroup().getIOMetricGroup();
        if (!this.recordWriter.isAvailable()) {
            gaugePeriodTimer = new GaugePeriodTimer(iOMetricGroup.getSoftBackPressuredTimePerSecond());
            availableFuture = this.recordWriter.getAvailableFuture();
        } else if (!this.inputProcessor.isAvailable()) {
            gaugePeriodTimer = new GaugePeriodTimer(iOMetricGroup.getIdleTimeMsPerSecond());
            availableFuture = this.inputProcessor.getAvailableFuture();
        } else {
            if (this.changelogWriterAvailabilityProvider == null) {
                return;
            }
            gaugePeriodTimer = null;
            availableFuture = this.changelogWriterAvailabilityProvider.getAvailableFuture();
        }
        FutureUtils.assertNoException(availableFuture.thenRun((Runnable) new ResumeWrapper(controller.suspendDefaultAction(gaugePeriodTimer), gaugePeriodTimer)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void endData(StopMode stopMode) throws Exception {
        if (stopMode == StopMode.DRAIN) {
            advanceToEndOfEventTime();
        }
        this.operatorChain.finishOperators(this.actionExecutor, stopMode);
        this.finishedOperators = true;
        for (ResultPartitionWriter resultPartitionWriter : getEnvironment().getAllWriters()) {
            resultPartitionWriter.notifyEndOfData(stopMode);
        }
        this.endOfDataReceived = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyEndOfData() {
        this.environment.getTaskManagerActions().notifyEndOfData(this.environment.getExecutionId());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSynchronousSavepoint(long j) {
        Preconditions.checkState(this.syncSavepoint == null || this.syncSavepoint.longValue() == j, "at most one stop-with-savepoint checkpoint at a time is allowed");
        this.syncSavepoint = Long.valueOf(j);
    }

    @VisibleForTesting
    OptionalLong getSynchronousSavepointId() {
        return this.syncSavepoint != null ? OptionalLong.of(this.syncSavepoint.longValue()) : OptionalLong.empty();
    }

    private boolean isCurrentSyncSavepoint(long j) {
        return this.syncSavepoint != null && this.syncSavepoint.longValue() == j;
    }

    protected void advanceToEndOfEventTime() throws Exception {
    }

    public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        InternalTimeServiceManager.Provider timerServiceProvider = this.configuration.getTimerServiceProvider(getUserCodeClassLoader());
        return new StreamTaskStateInitializerImpl(getEnvironment(), this.stateBackend, TtlTimeProvider.DEFAULT, timerServiceProvider != null ? timerServiceProvider : InternalTimeServiceManagerImpl::create, () -> {
            return this.canceled;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Counter setupNumRecordsInCounter(StreamOperator streamOperator) {
        try {
            return streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", e);
            return new SimpleCounter();
        }
    }

    public final void restore() throws Exception {
        restoreInternal();
    }

    void restoreInternal() throws Exception {
        if (this.isRunning) {
            LOG.debug("Re-restore attempt rejected.");
            return;
        }
        this.isRestoring = true;
        this.closedOperators = false;
        LOG.debug("Initializing {}.", getName());
        this.operatorChain = getEnvironment().getTaskStateManager().isTaskDeployedAsFinished() ? new FinishedOperatorChain<>(this, this.recordWriter) : new RegularOperatorChain<>(this, this.recordWriter);
        this.mainOperator = this.operatorChain.getMainOperator();
        getEnvironment().getTaskStateManager().getRestoreCheckpointId().ifPresent(l -> {
            this.latestReportCheckpointId = l.longValue();
        });
        init();
        ensureNotCanceled();
        LOG.debug("Invoking {}", getName());
        CompletableFuture completableFuture = (CompletableFuture) this.actionExecutor.call(this::restoreGates);
        this.mailboxProcessor.runMailboxLoop();
        ensureNotCanceled();
        Preconditions.checkState(completableFuture.isDone(), "Mailbox loop interrupted before recovery was finished.");
        this.channelIOExecutor.shutdown();
        this.isRunning = true;
        this.isRestoring = false;
    }

    private CompletableFuture<Void> restoreGates() throws Exception {
        SequentialChannelStateReader sequentialChannelStateReader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();
        sequentialChannelStateReader.readOutputData(getEnvironment().getAllWriters(), !this.configuration.isGraphContainingLoops());
        this.operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
        InputGate[] allInputGates = getEnvironment().getAllInputGates();
        this.channelIOExecutor.execute(() -> {
            try {
                sequentialChannelStateReader.readInputData(allInputGates);
            } catch (Exception e) {
                this.asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);
            }
        });
        ArrayList arrayList = new ArrayList(allInputGates.length);
        for (InputGate inputGate : allInputGates) {
            arrayList.add(inputGate.getStateConsumedFuture());
            inputGate.getStateConsumedFuture().thenRun(() -> {
                MailboxExecutor mailboxExecutor = this.mainMailboxExecutor;
                inputGate.getClass();
                mailboxExecutor.execute(inputGate::requestPartitions, "Input gate request partitions");
            });
        }
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]));
        MailboxProcessor mailboxProcessor = this.mailboxProcessor;
        mailboxProcessor.getClass();
        return allOf.thenRun(mailboxProcessor::suspend);
    }

    private void ensureNotCanceled() {
        if (this.canceled) {
            throw new CancelTaskException();
        }
    }

    public final void invoke() throws Exception {
        if (!this.isRunning) {
            LOG.debug("Restoring during invoke will be called.");
            restoreInternal();
        }
        ensureNotCanceled();
        scheduleBufferDebloater();
        getEnvironment().getMetricGroup().getIOMetricGroup().markTaskStart();
        runMailboxLoop();
        ensureNotCanceled();
        afterInvoke();
    }

    private void scheduleBufferDebloater() {
        if (getEnvironment().getAllInputGates().length == 0 || !this.environment.getTaskManagerInfo().getConfiguration().getBoolean(TaskManagerOptions.BUFFER_DEBLOAT_ENABLED)) {
            return;
        }
        this.systemTimerService.registerTimer(this.systemTimerService.getCurrentProcessingTime() + this.bufferDebloatPeriod, j -> {
            this.mainMailboxExecutor.execute(() -> {
                debloat();
                scheduleBufferDebloater();
            }, "Buffer size recalculation");
        });
    }

    @VisibleForTesting
    void debloat() {
        for (IndexedInputGate indexedInputGate : this.environment.getAllInputGates()) {
            indexedInputGate.triggerDebloating();
        }
    }

    @VisibleForTesting
    public boolean runMailboxStep() throws Exception {
        return this.mailboxProcessor.runMailboxStep();
    }

    @VisibleForTesting
    public boolean isMailboxLoopRunning() {
        return this.mailboxProcessor.isMailboxLoopRunning();
    }

    public void runMailboxLoop() throws Exception {
        this.mailboxProcessor.runMailboxLoop();
    }

    protected void afterInvoke() throws Exception {
        LOG.debug("Finished task {}", getName());
        getCompletionFuture().exceptionally(th -> {
            return null;
        }).join();
        HashSet hashSet = new HashSet();
        if (this.endOfDataReceived && areCheckpointsWithFinishedTasksEnabled()) {
            LOG.debug("Waiting for all the records processed by the downstream tasks.");
            for (ResultPartitionWriter resultPartitionWriter : getEnvironment().getAllWriters()) {
                hashSet.add(resultPartitionWriter.getAllDataProcessedFuture());
            }
            hashSet.add(this.finalCheckpointCompleted);
        }
        if (this.syncSavepoint != null) {
            hashSet.add(this.finalCheckpointCompleted);
        }
        FutureUtils.ConjunctFuture waitForAll = FutureUtils.waitForAll(hashSet);
        MailboxProcessor mailboxProcessor = this.mailboxProcessor;
        mailboxProcessor.getClass();
        waitForAll.thenRun(mailboxProcessor::allActionsCompleted);
        this.mailboxProcessor.runMailboxLoop();
        this.actionExecutor.runThrowing(() -> {
            this.timerService.quiesce().get();
            this.systemTimerService.quiesce().get();
            this.mailboxProcessor.prepareClose();
        });
        this.mailboxProcessor.drain();
        this.actionExecutor.runThrowing(() -> {
            this.isRunning = false;
        });
        LOG.debug("Finished operators for task {}", getName());
        this.operatorChain.flushOutputs();
        if (areCheckpointsWithFinishedTasksEnabled()) {
            this.subtaskCheckpointCoordinator.waitForPendingCheckpoints();
            LOG.debug("All pending checkpoints are finished");
        }
        disableInterruptOnCancel();
        closeAllOperators();
    }

    private boolean areCheckpointsWithFinishedTasksEnabled() {
        return ((Boolean) this.configuration.getConfiguration().get(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)).booleanValue() && this.configuration.isCheckpointingEnabled();
    }

    public final void cleanUp(Throwable th) throws Exception {
        LOG.debug("Cleanup StreamTask (operators closed: {}, cancelled: {})", Boolean.valueOf(this.closedOperators), Boolean.valueOf(this.canceled));
        this.failing = (this.canceled || th == null) ? false : true;
        Exception exc = null;
        if (th != null) {
            try {
                cancelTask();
            } catch (Throwable th2) {
                exc = th2 instanceof Exception ? (Exception) th2 : new Exception(th2);
            }
        }
        disableInterruptOnCancel();
        getCompletionFuture().exceptionally(th3 -> {
            return null;
        }).join();
        this.isRunning = false;
        Thread.interrupted();
        try {
            this.resourceCloser.close();
        } catch (Throwable th4) {
            throw ((Exception) ExceptionUtils.firstOrSuppressed(th4 instanceof Exception ? (Exception) th4 : new Exception(th4), exc));
        }
    }

    protected void cleanUpInternal() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.close();
        }
    }

    protected CompletableFuture<Void> getCompletionFuture() {
        return FutureUtils.completedVoidFuture();
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            cancelTask();
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
            getCompletionFuture().whenComplete((r5, th) -> {
                this.mailboxProcessor.allActionsCompleted();
                try {
                    this.subtaskCheckpointCoordinator.cancel();
                    this.cancelables.close();
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        } catch (Throwable th2) {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
            getCompletionFuture().whenComplete((r52, th3) -> {
                this.mailboxProcessor.allActionsCompleted();
                try {
                    this.subtaskCheckpointCoordinator.cancel();
                    this.cancelables.close();
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
            throw th2;
        }
    }

    public MailboxExecutorFactory getMailboxExecutorFactory() {
        MailboxProcessor mailboxProcessor = this.mailboxProcessor;
        mailboxProcessor.getClass();
        return mailboxProcessor::getMailboxExecutor;
    }

    private boolean taskIsAvailable() {
        return this.recordWriter.isAvailable() && (this.changelogWriterAvailabilityProvider == null || this.changelogWriterAvailabilityProvider.isAvailable());
    }

    public CanEmitBatchOfRecordsChecker getCanEmitBatchOfRecords() {
        return () -> {
            return !this.mailboxProcessor.hasMail() && taskIsAvailable();
        };
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    public final boolean isFailing() {
        return this.failing;
    }

    private void shutdownAsyncThreads() throws Exception {
        if (this.asyncOperationsThreadPool.isShutdown()) {
            return;
        }
        this.asyncOperationsThreadPool.shutdownNow();
    }

    private void releaseOutputResources() throws Exception {
        if (this.operatorChain != null) {
            this.actionExecutor.run(() -> {
                this.operatorChain.close();
            });
        } else {
            this.recordWriter.close();
        }
    }

    private void closeAllOperators() throws Exception {
        if (this.operatorChain == null || this.closedOperators) {
            return;
        }
        this.closedOperators = true;
        this.operatorChain.closeAllOperators();
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (!this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        if (!this.systemTimerService.isTerminated()) {
            LOG.info("System timer service is shutting down.");
            this.systemTimerService.shutdownService();
        }
        this.cancelables.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.configuration.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public final String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getTaskNameWithSubtaskAndId() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks() + " (" + getEnvironment().getExecutionId() + ')';
    }

    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.subtaskCheckpointCoordinator.getCheckpointStorage();
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        checkForcedFullSnapshotSupport(checkpointOptions);
        CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
        this.mainMailboxExecutor.execute(() -> {
            try {
                if (Arrays.stream(getEnvironment().getAllInputGates()).allMatch((v0) -> {
                    return v0.isFinished();
                })) {
                    completableFuture.complete(Boolean.valueOf(triggerCheckpointAsyncInMailbox(checkpointMetaData, checkpointOptions)));
                } else {
                    completableFuture.complete(Boolean.valueOf(triggerUnfinishedChannelsCheckpoint(checkpointMetaData, checkpointOptions)));
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                throw e;
            }
        }, "checkpoint %s with %s", new Object[]{checkpointMetaData, checkpointOptions});
        return completableFuture;
    }

    private boolean triggerCheckpointAsyncInMailbox(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            try {
                this.latestAsyncCheckpointStartDelayNanos = 1000000 * Math.max(0L, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
                CheckpointMetricsBuilder checkpointStartDelayNanos = new CheckpointMetricsBuilder().setAlignmentDurationNanos(0L).setBytesProcessedDuringAlignment(0L).setCheckpointStartDelayNanos(this.latestAsyncCheckpointStartDelayNanos);
                this.subtaskCheckpointCoordinator.initInputsCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);
                boolean performCheckpoint = performCheckpoint(checkpointMetaData, checkpointOptions, checkpointStartDelayNanos);
                if (!performCheckpoint) {
                    declineCheckpoint(checkpointMetaData.getCheckpointId());
                }
                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
                return performCheckpoint;
            } catch (Exception e) {
                if (this.isRunning) {
                    throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
                }
                LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), getName(), e});
                FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
                return false;
            }
        } catch (Throwable th) {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
            throw th;
        }
    }

    private boolean triggerUnfinishedChannelsCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        Optional<CheckpointBarrierHandler> checkpointBarrierHandler = getCheckpointBarrierHandler();
        Preconditions.checkState(checkpointBarrierHandler.isPresent(), "CheckpointBarrier should exist for tasks with network inputs.");
        CheckpointBarrier checkpointBarrier = new CheckpointBarrier(checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions);
        for (IndexedInputGate indexedInputGate : getEnvironment().getAllInputGates()) {
            if (!indexedInputGate.isFinished()) {
                Iterator it = indexedInputGate.getUnfinishedChannels().iterator();
                while (it.hasNext()) {
                    checkpointBarrierHandler.get().processBarrier(checkpointBarrier, (InputChannelInfo) it.next(), true);
                }
            }
        }
        return true;
    }

    protected Optional<CheckpointBarrierHandler> getCheckpointBarrierHandler() {
        return Optional.empty();
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetricsBuilder) throws IOException {
        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
        try {
            try {
                try {
                    performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetricsBuilder);
                } catch (Exception e) {
                    throw new IOException("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
                }
            } catch (CancelTaskException e2) {
                LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), Long.valueOf(checkpointMetaData.getCheckpointId()));
                throw e2;
            }
        } finally {
            FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
        }
    }

    public void abortCheckpointOnBarrier(long j, CheckpointException checkpointException) throws IOException {
        if (isCurrentSyncSavepoint(j)) {
            throw new FlinkRuntimeException("Stop-with-savepoint failed.");
        }
        this.subtaskCheckpointCoordinator.abortCheckpointOnBarrier(j, checkpointException, this.operatorChain);
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetricsBuilder checkpointMetricsBuilder) throws Exception {
        SnapshotType checkpointType = checkpointOptions.getCheckpointType();
        LOG.debug("Starting checkpoint {} {} on task {}", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), checkpointType, getName()});
        if (this.isRunning) {
            this.actionExecutor.runThrowing(() -> {
                if (isSynchronous(checkpointType)) {
                    setSynchronousSavepoint(checkpointMetaData.getCheckpointId());
                }
                if (areCheckpointsWithFinishedTasksEnabled() && this.endOfDataReceived && this.finalCheckpointMinId == null) {
                    this.finalCheckpointMinId = Long.valueOf(checkpointMetaData.getCheckpointId());
                }
                this.subtaskCheckpointCoordinator.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetricsBuilder, this.operatorChain, this.finishedOperators, this::isRunning);
            });
            return true;
        }
        this.actionExecutor.runThrowing(() -> {
            this.recordWriter.broadcastEvent(new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()));
        });
        return false;
    }

    private boolean isSynchronous(SnapshotType snapshotType) {
        return snapshotType.isSavepoint() && ((SavepointType) snapshotType).isSynchronous();
    }

    private void checkForcedFullSnapshotSupport(CheckpointOptions checkpointOptions) {
        if (checkpointOptions.getCheckpointType().equals(CheckpointType.FULL_CHECKPOINT) && !this.stateBackend.supportsNoClaimRestoreMode()) {
            throw new IllegalStateException(String.format("Configured state backend (%s) does not support enforcing a full snapshot. If you are restoring in %s mode, please consider choosing either %s or %s restore mode.", this.stateBackend, RestoreMode.NO_CLAIM, RestoreMode.CLAIM, RestoreMode.LEGACY));
        }
        if (checkpointOptions.getCheckpointType().isSavepoint()) {
            SavepointType checkpointType = checkpointOptions.getCheckpointType();
            if (!this.stateBackend.supportsSavepointFormat(checkpointType.getFormatType())) {
                throw new IllegalStateException(String.format("Configured state backend (%s) does not support %s savepoints", this.stateBackend, checkpointType.getFormatType()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void declineCheckpoint(long j) {
        getEnvironment().declineCheckpoint(j, new CheckpointException("Task Name" + getName(), CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    }

    public final ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    public Future<Void> notifyCheckpointCompleteAsync(long j) {
        return notifyCheckpointOperation(() -> {
            notifyCheckpointComplete(j);
        }, String.format("checkpoint %d complete", Long.valueOf(j)));
    }

    public Future<Void> notifyCheckpointAbortAsync(long j, long j2) {
        return notifyCheckpointOperation(() -> {
            if (j2 > 0) {
                notifyCheckpointComplete(j2);
            }
            if (isCurrentSyncSavepoint(j)) {
                throw new FlinkRuntimeException("Stop-with-savepoint failed.");
            }
            this.subtaskCheckpointCoordinator.notifyCheckpointAborted(j, this.operatorChain, this::isRunning);
        }, String.format("checkpoint %d aborted", Long.valueOf(j)));
    }

    public Future<Void> notifyCheckpointSubsumedAsync(long j) {
        return notifyCheckpointOperation(() -> {
            this.subtaskCheckpointCoordinator.notifyCheckpointSubsumed(j, this.operatorChain, this::isRunning);
        }, String.format("checkpoint %d subsumed", Long.valueOf(j)));
    }

    private Future<Void> notifyCheckpointOperation(RunnableWithException runnableWithException, String str) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {
            try {
                runnableWithException.run();
                completableFuture.complete(null);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                throw e;
            }
        }, str);
        return completableFuture;
    }

    private void notifyCheckpointComplete(long j) throws Exception {
        LOG.debug("Notify checkpoint {} complete on task {}", Long.valueOf(j), getName());
        if (j <= this.latestReportCheckpointId) {
            return;
        }
        this.latestReportCheckpointId = j;
        this.subtaskCheckpointCoordinator.notifyCheckpointComplete(j, this.operatorChain, this::isRunning);
        if (this.isRunning) {
            if (isCurrentSyncSavepoint(j)) {
                this.finalCheckpointCompleted.complete(null);
            } else {
                if (this.syncSavepoint != null || this.finalCheckpointMinId == null || j < this.finalCheckpointMinId.longValue()) {
                    return;
                }
                this.finalCheckpointCompleted.complete(null);
            }
        }
    }

    private void tryShutdownTimerService() {
        long j = getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
        tryShutdownTimerService(j, this.timerService);
        tryShutdownTimerService(j, this.systemTimerService);
    }

    private void tryShutdownTimerService(long j, TimerService timerService) {
        if (timerService.isTerminated() || timerService.shutdownServiceUninterruptible(j)) {
            return;
        }
        LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", Long.valueOf(j));
    }

    public void dispatchOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException {
        try {
            this.mainMailboxExecutor.execute(() -> {
                this.operatorChain.dispatchOperatorEvent(operatorID, serializedValue);
            }, "dispatch operator event");
        } catch (RejectedExecutionException e) {
        }
    }

    private StateBackend createStateBackend() throws Exception {
        StateBackend stateBackend = this.configuration.getStateBackend(getUserCodeClassLoader());
        Optional optional = this.environment.getJobConfiguration().getOptional(StateChangelogOptionsInternal.ENABLE_CHANGE_LOG_FOR_APPLICATION);
        return StateBackendLoader.fromApplicationOrConfigOrDefault(stateBackend, optional.isPresent() ? TernaryBoolean.fromBoolean(((Boolean) optional.get()).booleanValue()) : TernaryBoolean.UNDEFINED, getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
    }

    private CheckpointStorage createCheckpointStorage(StateBackend stateBackend) throws Exception {
        return CheckpointStorageLoader.load(this.configuration.getCheckpointStorage(getUserCodeClassLoader()), this.configuration.getSavepointDir(getUserCodeClassLoader()), stateBackend, getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
    }

    @VisibleForTesting
    TimerService getTimerService() {
        return this.timerService;
    }

    @VisibleForTesting
    OP getMainOperator() {
        return this.mainOperator;
    }

    @VisibleForTesting
    StreamTaskActionExecutor getActionExecutor() {
        return this.actionExecutor;
    }

    public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
        return mailboxExecutor -> {
            return new ProcessingTimeServiceImpl(this.timerService, processingTimeCallback -> {
                return deferCallbackToMailbox(mailboxExecutor, processingTimeCallback);
            });
        };
    }

    public void handleAsyncException(String str, Throwable th) {
        if (this.isRestoring || this.isRunning) {
            this.asyncExceptionHandler.handleAsyncException(str, th);
        }
    }

    public String toString() {
        return getName();
    }

    public final CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    @VisibleForTesting
    public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig streamConfig, Environment environment) {
        List createRecordWriters = createRecordWriters(streamConfig, environment);
        return createRecordWriters.size() == 1 ? new SingleRecordWriter((RecordWriter) createRecordWriters.get(0)) : createRecordWriters.size() == 0 ? new NonRecordWriter() : new MultipleRecordWriters(createRecordWriters);
    }

    private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig streamConfig, Environment environment) {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (NonChainedOutput nonChainedOutput : streamConfig.getVertexNonChainedOutputs(environment.getUserCodeClassLoader().asClassLoader())) {
            replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(environment, nonChainedOutput, i);
            int i2 = i;
            i++;
            arrayList.add(createRecordWriter(nonChainedOutput, i2, environment, environment.getTaskInfo().getTaskNameWithSubtasks(), nonChainedOutput.getBufferTimeout()));
        }
        return arrayList;
    }

    private static void replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(Environment environment, NonChainedOutput nonChainedOutput, int i) {
        if (!(nonChainedOutput.getPartitioner() instanceof ForwardPartitioner) || environment.getWriter(i).getNumberOfSubpartitions() == environment.getTaskInfo().getNumberOfParallelSubtasks()) {
            return;
        }
        LOG.debug("Replacing forward partitioner with rebalance for {}", environment.getTaskInfo().getTaskNameWithSubtasks());
        nonChainedOutput.setPartitioner(new RebalancePartitioner());
    }

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(NonChainedOutput nonChainedOutput, int i, Environment environment, String str, long j) {
        int numTargetKeyGroups;
        StreamPartitioner streamPartitioner = null;
        try {
            streamPartitioner = (StreamPartitioner) InstantiationUtil.clone(nonChainedOutput.getPartitioner(), environment.getUserCodeClassLoader().asClassLoader());
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{streamPartitioner, Integer.valueOf(i), str});
        ResultPartitionWriter writer = environment.getWriter(i);
        if ((streamPartitioner instanceof ConfigurableStreamPartitioner) && 0 < (numTargetKeyGroups = writer.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner) streamPartitioner).configure(numTargetKeyGroups);
        }
        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> build = new RecordWriterBuilder().setChannelSelector(streamPartitioner).setTimeout(j).setTaskName(str).build(writer);
        build.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return build;
    }

    private void handleTimerException(Exception exc) {
        handleAsyncException("Caught exception while processing timer.", new TimerException(exc));
    }

    @VisibleForTesting
    ProcessingTimeService.ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
        return j -> {
            mailboxExecutor.execute(() -> {
                invokeProcessingTimeCallback(processingTimeCallback, j);
            }, "Timer callback for %s @ %d", new Object[]{processingTimeCallback, Long.valueOf(j)});
        };
    }

    private void invokeProcessingTimeCallback(ProcessingTimeService.ProcessingTimeCallback processingTimeCallback, long j) {
        try {
            processingTimeCallback.onProcessingTime(j);
        } catch (Throwable th) {
            handleAsyncException("Caught exception while processing timer.", new TimerException(th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getAsyncCheckpointStartDelayNanos() {
        return this.latestAsyncCheckpointStartDelayNanos;
    }

    public boolean isUsingNonBlockingInput() {
        return true;
    }

    private void disableInterruptOnCancel() {
        synchronized (this.shouldInterruptOnCancelLock) {
            this.shouldInterruptOnCancel = false;
        }
    }

    public void maybeInterruptOnCancel(Thread thread, @Nullable String str, @Nullable Long l) {
        synchronized (this.shouldInterruptOnCancelLock) {
            if (this.shouldInterruptOnCancel) {
                if (str != null && l != null) {
                    Task.logTaskThreadStackTrace(thread, str, l.longValue(), "interrupting");
                }
                thread.interrupt();
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.EnvironmentProvider
    public final Environment getEnvironment() {
        return this.environment;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1352294148:
                if (implMethodName.equals("create")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case WatermarkStatus.ACTIVE_STATUS /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/api/operators/InternalTimeServiceManager$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/runtime/state/CheckpointableKeyedStateBackend;Ljava/lang/ClassLoader;Lorg/apache/flink/streaming/api/operators/KeyContext;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;Ljava/lang/Iterable;Lorg/apache/flink/streaming/runtime/tasks/StreamTaskCancellationContext;)Lorg/apache/flink/streaming/api/operators/InternalTimeServiceManager;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/runtime/state/CheckpointableKeyedStateBackend;Ljava/lang/ClassLoader;Lorg/apache/flink/streaming/api/operators/KeyContext;Lorg/apache/flink/streaming/runtime/tasks/ProcessingTimeService;Ljava/lang/Iterable;Lorg/apache/flink/streaming/runtime/tasks/StreamTaskCancellationContext;)Lorg/apache/flink/streaming/api/operators/InternalTimeServiceManagerImpl;")) {
                    return InternalTimeServiceManagerImpl::create;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
