package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.VoidPermanentBlobService;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutorFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.SharedResources;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.BackendForTestStream;
import org.apache.flink.runtime.taskexecutor.KvStateService;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.class */
public class TaskCheckpointingBehaviourTest extends TestLogger {
    private static final OneShotLatch IN_CHECKPOINT_LATCH = new OneShotLatch();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$AsyncFailureInducingStateBackend.class */
    public static class AsyncFailureInducingStateBackend extends MemoryStateBackend {
        private static final long serialVersionUID = -7613628662587098470L;
        public static final SnapshotStrategy<OperatorStateHandle, SnapshotResources> FAILING_STRATEGY = new SnapshotStrategy<OperatorStateHandle, SnapshotResources>() { // from class: org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest.AsyncFailureInducingStateBackend.2
            public SnapshotResources syncPrepareResources(long j) throws Exception {
                return null;
            }

            public SnapshotStrategy.SnapshotResultSupplier<OperatorStateHandle> asyncSnapshot(SnapshotResources snapshotResources, long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
                return closeableRegistry -> {
                    throw new Exception("Async part snapshot exception.");
                };
            }
        };

        private AsyncFailureInducingStateBackend() {
        }

        /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest$AsyncFailureInducingStateBackend$1] */
        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
            return new DefaultOperatorStateBackendBuilder(operatorStateBackendParameters.getEnv().getUserCodeClassLoader().asClassLoader(), operatorStateBackendParameters.getEnv().getExecutionConfig(), true, operatorStateBackendParameters.getStateHandles(), operatorStateBackendParameters.getCancelStreamRegistry()) { // from class: org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest.AsyncFailureInducingStateBackend.1
                /* renamed from: build, reason: merged with bridge method [inline-methods] */
                public DefaultOperatorStateBackend m194build() {
                    CloseableRegistry closeableRegistry = new CloseableRegistry();
                    return new DefaultOperatorStateBackend(this.executionConfig, closeableRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), new SnapshotStrategyRunner("Failing strategy", AsyncFailureInducingStateBackend.FAILING_STRATEGY, closeableRegistry, SnapshotExecutionType.ASYNCHRONOUS));
                }
            }.m194build();
        }

        /* renamed from: configure, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public AsyncFailureInducingStateBackend m193configure(ReadableConfig readableConfig, ClassLoader classLoader) {
            return this;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$FilterOperator.class */
    private static final class FilterOperator extends StreamFilter<Object> {
        private static final long serialVersionUID = 1;

        public FilterOperator() {
            super(new FilterFunction<Object>() { // from class: org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest.FilterOperator.1
                public boolean filter(Object obj) {
                    return false;
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$LockingOutputStream.class */
    public static final class LockingOutputStream extends CheckpointStateOutputStream {
        private final Object lock;
        private volatile boolean closed;

        private LockingOutputStream() {
            this.lock = new Object();
        }

        @Nullable
        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new UnsupportedOperationException();
        }

        public void write(int i) throws IOException {
            synchronized (this.lock) {
                while (!this.closed) {
                    try {
                        this.lock.wait();
                    } catch (InterruptedException e) {
                    }
                }
            }
        }

        public void close() throws IOException {
            synchronized (this.lock) {
                this.closed = true;
                this.lock.notifyAll();
            }
        }

        public long getPos() {
            return 0L;
        }

        public void flush() {
        }

        public void sync() {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$SyncFailureInducingStateBackend.class */
    private static class SyncFailureInducingStateBackend extends MemoryStateBackend {
        private static final long serialVersionUID = -1915780414440060539L;

        private SyncFailureInducingStateBackend() {
        }

        /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest$SyncFailureInducingStateBackend$1] */
        public OperatorStateBackend createOperatorStateBackend(StateBackend.OperatorStateBackendParameters operatorStateBackendParameters) throws Exception {
            return new DefaultOperatorStateBackendBuilder(operatorStateBackendParameters.getEnv().getUserCodeClassLoader().asClassLoader(), operatorStateBackendParameters.getEnv().getExecutionConfig(), true, operatorStateBackendParameters.getStateHandles(), operatorStateBackendParameters.getCancelStreamRegistry()) { // from class: org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest.SyncFailureInducingStateBackend.1
                /* renamed from: build, reason: merged with bridge method [inline-methods] */
                public DefaultOperatorStateBackend m197build() {
                    return new DefaultOperatorStateBackend(this.executionConfig, this.cancelStreamRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), (SnapshotStrategyRunner) Mockito.mock(SnapshotStrategyRunner.class)) { // from class: org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest.SyncFailureInducingStateBackend.1.1
                        @Nonnull
                        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
                            throw new Exception("Sync part snapshot exception.");
                        }
                    };
                }
            }.m197build();
        }

        /* renamed from: configure, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public SyncFailureInducingStateBackend m196configure(ReadableConfig readableConfig, ClassLoader classLoader) {
            return this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$TestDeclinedCheckpointResponder.class */
    public static class TestDeclinedCheckpointResponder implements CheckpointResponder {
        final OneShotLatch declinedLatch;

        private TestDeclinedCheckpointResponder() {
            this.declinedLatch = new OneShotLatch();
        }

        public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
            throw new RuntimeException("Unexpected call.");
        }

        public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics) {
        }

        public void declineCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointException checkpointException) {
            this.declinedLatch.trigger();
        }

        public void reportInitializationMetrics(JobID jobID, SubTaskInitializationMetrics subTaskInitializationMetrics) {
        }

        public OneShotLatch getDeclinedLatch() {
            return this.declinedLatch;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$TestOperator.class */
    private static final class TestOperator extends StreamFilter<Object> {
        private static final long serialVersionUID = 1;

        public TestOperator() {
            super(new FilterFunction<Object>() { // from class: org.apache.flink.streaming.runtime.tasks.TaskCheckpointingBehaviourTest.TestOperator.1
                public boolean filter(Object obj) {
                    return false;
                }
            });
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            OperatorStateCheckpointOutputStream rawOperatorStateOutput = stateSnapshotContext.getRawOperatorStateOutput();
            TaskCheckpointingBehaviourTest.IN_CHECKPOINT_LATCH.trigger();
            rawOperatorStateOutput.write(1);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest$TestStreamTask.class */
    public static final class TestStreamTask extends OneInputStreamTask<Object, Object> {
        public TestStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        public void init() {
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            triggerCheckpointOnBarrier(new CheckpointMetaData(11L, System.currentTimeMillis()), CheckpointOptions.forCheckpointWithDefaultLocation(), new CheckpointMetricsBuilder());
            while (isRunning()) {
                Thread.sleep(1L);
            }
            controller.suspendDefaultAction();
            this.mailboxProcessor.suspend();
        }

        protected void cleanUpInternal() {
        }
    }

    @Test
    public void testDeclineOnCheckpointErrorInSyncPart() throws Exception {
        runTaskExpectFailure(createTask(new FilterOperator(), new SyncFailureInducingStateBackend(), new TestDeclinedCheckpointResponder()));
    }

    @Test
    public void testDeclineOnCheckpointErrorInAsyncPart() throws Exception {
        TestDeclinedCheckpointResponder testDeclinedCheckpointResponder = new TestDeclinedCheckpointResponder();
        runTaskExpectCheckpointDeclined(createTask(new FilterOperator(), new AsyncFailureInducingStateBackend(), testDeclinedCheckpointResponder), testDeclinedCheckpointResponder);
    }

    @Test
    public void testBlockingNonInterruptibleCheckpoint() throws Exception {
        Task createTask = createTask(new TestOperator(), new BackendForTestStream(() -> {
            return new LockingOutputStream();
        }), (CheckpointResponder) Mockito.mock(CheckpointResponder.class));
        createTask.startTaskThread();
        IN_CHECKPOINT_LATCH.await();
        createTask.cancelExecution();
        createTask.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
        Assert.assertNull(createTask.getFailureCause());
    }

    private void runTaskExpectCheckpointDeclined(Task task, TestDeclinedCheckpointResponder testDeclinedCheckpointResponder) throws Exception {
        task.startTaskThread();
        testDeclinedCheckpointResponder.declinedLatch.await();
        Assert.assertEquals(ExecutionState.RUNNING, task.getExecutionState());
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    private void runTaskExpectFailure(Task task) throws Exception {
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, task.getExecutionState());
    }

    private static Task createTask(StreamOperator<?> streamOperator, StateBackend stateBackend, CheckpointResponder checkpointResponder) throws IOException {
        Configuration configuration = new Configuration();
        StreamConfig streamConfig = new StreamConfig(configuration);
        streamConfig.setStreamOperator(streamOperator);
        streamConfig.setOperatorID(new OperatorID());
        streamConfig.setStateBackend(stateBackend);
        streamConfig.serializeAllConfigs();
        JobInformation jobInformation = new JobInformation(new JobID(), "test job name", new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList());
        TaskInformation taskInformation = new TaskInformation(new JobVertexID(), "test task name", 1, 11, TestStreamTask.class.getName(), configuration);
        return new Task(jobInformation, taskInformation, ExecutionGraphTestUtils.createExecutionAttemptId(taskInformation.getJobVertexId()), new AllocationID(), Collections.emptyList(), Collections.emptyList(), (MemoryManager) Mockito.mock(MemoryManager.class), new SharedResources(), (IOManager) Mockito.mock(IOManager.class), new NettyShuffleEnvironmentBuilder().build(), new KvStateService(new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), new TaskEventDispatcher(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, new TestTaskStateManager(), (TaskManagerActions) Mockito.mock(TaskManagerActions.class), (InputSplitProvider) Mockito.mock(InputSplitProvider.class), checkpointResponder, new NoOpTaskOperatorEventGateway(), new TestGlobalAggregateManager(), TestingClassLoaderLease.newBuilder().build(), new FileCache(new String[]{EnvironmentInformation.getTemporaryFileDirectory()}, VoidPermanentBlobService.INSTANCE), new TestingTaskManagerRuntimeInfo(), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class), Executors.directExecutor(), new ChannelStateWriteRequestExecutorFactory(jobInformation.getJobId()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1923075322:
                if (implMethodName.equals("lambda$testBlockingNonInterruptibleCheckpoint$a2be9aef$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/runtime/state/testutils/BackendForTestStream$StreamFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/runtime/state/CheckpointStateOutputStream;")) {
                    return () -> {
                        return new LockingOutputStream();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
