package org.apache.flink.test.checkpointing.utils;

import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/CancellingIntegerSource.class */
public class CancellingIntegerSource extends RichSourceFunction<Integer> implements CheckpointedFunction, CheckpointListener {
    private final int count;
    private final Integer cancelAfter;

    @Nullable
    private transient Long cancelAfterCheckpointId;
    private volatile transient boolean isCanceled;
    private volatile transient int sentCount;
    private transient ListState<Integer> lastSentStored;

    private CancellingIntegerSource(int i, @Nullable Integer num) {
        Preconditions.checkArgument(i > 0);
        Preconditions.checkArgument(num == null || num.intValue() > 0);
        this.cancelAfter = num;
        this.count = i;
    }

    public void run(SourceFunction.SourceContext<Integer> sourceContext) throws InterruptedException {
        emitInLoop(sourceContext);
        awaitCancellation();
    }

    private void emitInLoop(SourceFunction.SourceContext<Integer> sourceContext) throws InterruptedException {
        while (this.sentCount < this.count && !this.isCanceled) {
            synchronized (sourceContext.getCheckpointLock()) {
                if (this.sentCount < this.count && !this.isCanceled) {
                    int i = this.sentCount;
                    this.sentCount = i + 1;
                    sourceContext.collect(Integer.valueOf(i));
                }
            }
            Thread.sleep(10L);
        }
    }

    private void awaitCancellation() {
        while (!this.isCanceled) {
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
                if (this.isCanceled) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.lastSentStored = functionInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("counter", Integer.class));
        if (functionInitializationContext.isRestored()) {
            this.sentCount = ((Integer) Iterables.getOnlyElement((Iterable) this.lastSentStored.get())).intValue();
        }
        Preconditions.checkState(this.cancelAfter == null || this.sentCount < this.cancelAfter.intValue());
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.lastSentStored.update(Collections.singletonList(Integer.valueOf(this.sentCount)));
        if (this.cancelAfter == null || this.cancelAfter.intValue() > this.sentCount || this.cancelAfterCheckpointId != null) {
            return;
        }
        this.cancelAfterCheckpointId = Long.valueOf(functionSnapshotContext.getCheckpointId());
    }

    public void notifyCheckpointComplete(long j) {
        if (this.cancelAfterCheckpointId == null || this.cancelAfterCheckpointId.longValue() > j) {
            return;
        }
        cancel();
    }

    public void notifyCheckpointAborted(long j) {
    }

    public void cancel() {
        this.isCanceled = true;
    }

    public static CancellingIntegerSource upTo(int i, boolean z) {
        return new CancellingIntegerSource(i, z ? null : Integer.valueOf(i));
    }
}
