package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.class */
class StreamTaskCancellationTest {

    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest$CancelFailingTask.class */
    public static class CancelFailingTask extends StreamTask<String, AbstractStreamOperator<String>> {
        private static OneShotLatch syncLatch;

        /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest$CancelFailingTask$LockHolder.class */
        private static final class LockHolder extends Thread implements Closeable {
            private final OneShotLatch trigger;
            private final Object lock;
            private volatile boolean canceled;

            private LockHolder(Object obj, OneShotLatch oneShotLatch) {
                this.lock = obj;
                this.trigger = oneShotLatch;
            }

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                synchronized (this.lock) {
                    while (!this.canceled) {
                        this.trigger.trigger();
                        try {
                            Thread.sleep(1000000000L);
                        } catch (InterruptedException e) {
                        }
                    }
                }
            }

            public void cancel() {
                this.canceled = true;
            }

            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                this.canceled = true;
                interrupt();
            }
        }

        public CancelFailingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            OneShotLatch oneShotLatch = new OneShotLatch();
            Object obj = new Object();
            LockHolder lockHolder = new LockHolder(obj, oneShotLatch);
            lockHolder.start();
            try {
                getCancelables().registerCloseable(lockHolder);
                oneShotLatch.await();
                syncLatch.trigger();
                synchronized (obj) {
                }
                controller.suspendDefaultAction();
                this.mailboxProcessor.suspend();
            } finally {
                lockHolder.close();
            }
        }

        protected void cleanUpInternal() {
        }

        protected void cancelTask() throws Exception {
            throw new Exception("test exception");
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest$CancelThrowingTask.class */
    public static class CancelThrowingTask extends StreamTask<String, AbstractStreamOperator<String>> {
        public CancelThrowingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) {
            throw new CancelTaskException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest$TaskWithPreRegisteredTimers.class */
    public static class TaskWithPreRegisteredTimers extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, Triggerable<String, VoidNamespace> {
        private static ThrowingConsumer<String, Exception> onTimerListener;
        private final int numTimersToRegister;
        private final boolean processingTime;

        private TaskWithPreRegisteredTimers(int i, boolean z) {
            this.numTimersToRegister = i;
            this.processingTime = z;
        }

        public void open() throws Exception {
            InternalTimerService internalTimerService = getInternalTimerService("test-timers", VoidNamespaceSerializer.INSTANCE, this);
            KeyedStateBackend keyedStateBackend = getKeyedStateBackend();
            for (int i = 0; i < this.numTimersToRegister; i++) {
                keyedStateBackend.setCurrentKey("key-" + i);
                if (this.processingTime) {
                    internalTimerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, 0L);
                } else {
                    internalTimerService.registerEventTimeTimer(VoidNamespace.INSTANCE, 0L);
                }
            }
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }

        public void onEventTime(InternalTimer<String, VoidNamespace> internalTimer) throws Exception {
            Preconditions.checkState(!this.processingTime);
            ((ThrowingConsumer) Preconditions.checkNotNull(onTimerListener)).accept((String) internalTimer.getKey());
        }

        public void onProcessingTime(InternalTimer<String, VoidNamespace> internalTimer) throws Exception {
            Preconditions.checkState(this.processingTime);
            ((ThrowingConsumer) Preconditions.checkNotNull(onTimerListener)).accept((String) internalTimer.getKey());
        }

        private static void setOnTimerListener(ThrowingConsumer<String, Exception> throwingConsumer) {
            onTimerListener = (ThrowingConsumer) Preconditions.checkNotNull(throwingConsumer);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest$TestInterruptInCloseOperator.class */
    private static class TestInterruptInCloseOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private TestInterruptInCloseOperator() {
        }

        public void close() throws Exception {
            super.close();
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            Thread thread = new Thread(() -> {
                do {
                } while (atomicBoolean.get());
            });
            thread.start();
            try {
                getContainingTask().maybeInterruptOnCancel(thread, (String) null, (Long) null);
                Assertions.assertThat(thread.isInterrupted()).isFalse();
            } finally {
                atomicBoolean.set(false);
            }
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
        }
    }

    StreamTaskCancellationTest() {
    }

    @Test
    void testDoNotInterruptWhileClosing() throws Exception {
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain(new TestInterruptInCloseOperator()).build();
        if (build != null) {
            build.close();
        }
    }

    @Test
    void testCanceleablesCanceledOnCancelTaskError() throws Exception {
        CancelFailingTask.syncLatch = new OneShotLatch();
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        try {
            Task createTask = StreamTaskTest.createTask(CancelFailingTask.class, build, streamConfig, new Configuration(), EXECUTOR_RESOURCE.getExecutor());
            createTask.startTaskThread();
            CancelFailingTask.syncLatch.await();
            createTask.cancelExecution();
            createTask.getExecutingThread().join();
            Assertions.assertThat(createTask.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testCancelTaskExceptionHandling() throws Exception {
        StreamConfig streamConfig = new StreamConfig(new Configuration());
        NettyShuffleEnvironment build = new NettyShuffleEnvironmentBuilder().build();
        try {
            Task createTask = StreamTaskTest.createTask(CancelThrowingTask.class, build, streamConfig, new Configuration(), EXECUTOR_RESOURCE.getExecutor());
            createTask.startTaskThread();
            createTask.getExecutingThread().join();
            Assertions.assertThat(createTask.getExecutionState()).isEqualTo(ExecutionState.CANCELED);
            if (build != null) {
                build.close();
            }
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testCancelTaskShouldPreventAdditionalEventTimeTimersFromBeingFired() throws Exception {
        testCancelTaskShouldPreventAdditionalTimersFromBeingFired(false);
    }

    @Test
    void testCancelTaskShouldPreventAdditionalProcessingTimeTimersFromBeingFired() throws Exception {
        testCancelTaskShouldPreventAdditionalTimersFromBeingFired(true);
    }

    private void testCancelTaskShouldPreventAdditionalTimersFromBeingFired(boolean z) throws Exception {
        AtomicInteger atomicInteger = new AtomicInteger(0);
        StreamTaskMailboxTestHarness build = new StreamTaskMailboxTestHarnessBuilder(OneInputStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO).addInput(BasicTypeInfo.STRING_TYPE_INFO).setKeyType(BasicTypeInfo.STRING_TYPE_INFO).setupOutputForSingletonOperatorChain((StreamOperator<?>) new TaskWithPreRegisteredTimers(100, z)).build();
        try {
            TaskWithPreRegisteredTimers.setOnTimerListener(str -> {
                if (atomicInteger.incrementAndGet() >= 10) {
                    build.cancel();
                }
            });
            build.processElement(new Watermark(Long.MAX_VALUE));
            while (z) {
                if (atomicInteger.get() != 0) {
                    break;
                } else {
                    build.processAll();
                }
            }
            if (build != null) {
                build.close();
            }
            Assertions.assertThat(atomicInteger).hasValue(10);
        } catch (Throwable th) {
            if (build != null) {
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
