/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.FileVisitOption;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.curator5.org.apache.curator.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.guava31.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.LogLevelExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.event.Level;

class UnalignedCheckpointStressITCase {
    private static final int CHECKPOINT_INTERVAL = 20;
    private static final int MINIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 2;
    private static final int MAXIMUM_COMPLETED_CHECKPOINTS_BETWEEN_FAILURES = 10;
    private static final long TEST_DURATION = Time.seconds((long)20L).toMilliseconds();
    private static final int NUM_TASK_MANAGERS = 3;
    private static final int NUM_TASK_SLOTS = 2;
    private static final int PARALLELISM = 6;
    private static final int BUFFER_SIZE = 4096;
    private static final int BUFFER_TIME = 4;
    private static final int NORMAL_RECORD_SLEEP = 1;
    private static final int SMALL_RECORD_SIZE = 1024;
    @RegisterExtension
    public static final LogLevelExtension NETWORK_LOGGER = new LogLevelExtension().set(NetworkActionsLogger.class, Level.TRACE);
    @TempDir
    public File temporaryFolder;
    @TempDir
    public Path changelogFolder;
    private MiniClusterWithClientResource cluster;
    private static final Pattern LAST_INT_PATTERN = Pattern.compile("[^0-9]+([0-9]+)$");

    UnalignedCheckpointStressITCase() {
    }

    @BeforeEach
    void setup() throws Exception {
        Configuration configuration = new Configuration();
        File folder = this.temporaryFolder;
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)folder.toURI().toString());
        configuration.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, (Object)1);
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)TempDirUtils.newFolder((Path)this.changelogFolder), (Duration)Duration.ofMinutes(1L), (int)10);
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
        this.cluster.before();
    }

    @AfterEach
    void shutDownExistingCluster() {
        if (this.cluster != null) {
            this.cluster.after();
            this.cluster = null;
        }
    }

    @Test
    void runStressTest() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMillis(TEST_DURATION));
        File externalizedCheckpoint = null;
        while (deadline.hasTimeLeft()) {
            externalizedCheckpoint = this.runAndTakeExternalCheckpoint(externalizedCheckpoint);
            this.cleanDirectoryExcept(externalizedCheckpoint);
        }
    }

    private void testProgram(StreamExecutionEnvironment env) {
        int numberOfSources1 = 6;
        int numberOfSources2 = 3;
        int numberOfSources3 = 2;
        int numberOfSources4 = 1;
        int totalNumberOfSources = numberOfSources1 + numberOfSources2 + numberOfSources3 + numberOfSources4;
        DataStreamSource source1 = env.addSource((SourceFunction)new LegacySourceFunction(0)).setParallelism(numberOfSources1);
        DataStreamSource source2 = env.addSource((SourceFunction)new LegacySourceFunction(numberOfSources1)).setParallelism(numberOfSources2);
        DataStreamSource source3 = env.addSource((SourceFunction)new LegacySourceFunction(numberOfSources1 + numberOfSources2)).setParallelism(numberOfSources3);
        DataStreamSource source4 = env.addSource((SourceFunction)new LegacySourceFunction(numberOfSources1 + numberOfSources2 + numberOfSources3)).setParallelism(numberOfSources4);
        DataStream source12 = source1.union(new DataStream[]{source2});
        DataStream source34 = source3.union(new DataStream[]{source4});
        SingleOutputStreamOperator sources = source12.keyBy(Record::getSourceId).connect((DataStream)source34.keyBy(Record::getSourceId)).process((KeyedCoProcessFunction)new KeyedCoProcessFunction<Integer, Record, Record, Record>(){

            public void processElement1(Record value, KeyedCoProcessFunction.Context ctx, Collector<Record> out) {
                out.collect((Object)value.validate());
            }

            public void processElement2(Record value, KeyedCoProcessFunction.Context ctx, Collector<Record> out) {
                out.collect((Object)value.validate());
            }
        });
        SingleOutputStreamOperator stream = sources.rebalance().map(Record::validate).keyBy(Record::getSourceId).map((MapFunction)new ThrottlingMap(100));
        DataStreamUtils.reinterpretAsKeyedStream((DataStream)stream, Record::getSourceId).window((WindowAssigner)TumblingProcessingTimeWindows.of((Time)Time.milliseconds((long)5L))).process((ProcessWindowFunction)new ReEmitAll()).map((MapFunction)new ThrottlingMap(Math.max(1, totalNumberOfSources - 2))).setParallelism(1).map((MapFunction)new FailingMapper()).setParallelism(1);
    }

    private void cleanDirectoryExcept(File externalizedCheckpoint) throws IOException {
        File directoryToKeep = externalizedCheckpoint.getParentFile();
        for (File directory : Objects.requireNonNull(this.temporaryFolder.listFiles())) {
            if (directory.equals(directoryToKeep)) continue;
            FileUtils.deleteDirectory((File)directory);
        }
    }

    private File runAndTakeExternalCheckpoint(@Nullable File startingCheckpoint) throws Exception {
        block2: {
            StreamExecutionEnvironment env = this.defineEnvironment();
            this.testProgram(env);
            StreamGraph streamGraph = env.getStreamGraph();
            Optional.ofNullable(startingCheckpoint).map(File::toString).map(SavepointRestoreSettings::forPath).ifPresent(arg_0 -> ((StreamGraph)streamGraph).setSavepointRestoreSettings(arg_0));
            JobGraph jobGraph = streamGraph.getJobGraph();
            try {
                TestUtils.submitJobAndWaitForResult((ClusterClient)this.cluster.getClusterClient(), (JobGraph)jobGraph, (ClassLoader)this.getClass().getClassLoader());
            }
            catch (Exception e) {
                if (ExceptionUtils.findThrowable((Throwable)e, ExpectedTestException.class).isPresent()) break block2;
                throw e;
            }
        }
        return this.discoverRetainedCheckpoint();
    }

    private static int getCheckpointNumberFromPath(Path checkpointDir) {
        Matcher matcher = LAST_INT_PATTERN.matcher(checkpointDir.toString());
        Preconditions.checkState((boolean)matcher.find());
        return Integer.parseInt(matcher.group(1));
    }

    private File discoverRetainedCheckpoint() throws Exception {
        File rootDir = this.temporaryFolder;
        Path checkpointDir = null;
        for (int i = 0; i <= 1000 && checkpointDir == null; ++i) {
            Thread.sleep(5L);
            MaxCheckpointFileVisitor fileVisitor = new MaxCheckpointFileVisitor();
            Files.walkFileTree(Paths.get(rootDir.getPath(), new String[0]), fileVisitor);
            checkpointDir = fileVisitor.getMaxCheckpointDir();
        }
        if (checkpointDir == null) {
            Stream<Path> savepoint = Files.walk(Paths.get(rootDir.getPath(), new String[0]), new FileVisitOption[0]);
            Throwable throwable = null;
            try {
                try {
                    List files = savepoint.collect(Collectors.toList());
                    throw new IllegalStateException("Failed to find _metadata file among " + files);
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
            }
            catch (Throwable throwable3) {
                if (savepoint != null) {
                    if (throwable != null) {
                        try {
                            savepoint.close();
                        }
                        catch (Throwable throwable4) {
                            throwable.addSuppressed(throwable4);
                        }
                    } else {
                        savepoint.close();
                    }
                }
                throw throwable3;
            }
        }
        return checkpointDir.toFile();
    }

    private StreamExecutionEnvironment defineEnvironment() {
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(6);
        env.enableCheckpointing(20L);
        env.getCheckpointConfig().enableUnalignedCheckpoints();
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        return env;
    }

    private static class MaxCheckpointFileVisitor
    extends SimpleFileVisitor<Path> {
        private Path maxCheckpointDir;

        private MaxCheckpointFileVisitor() {
        }

        @Override
        public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) {
            if (path.endsWith("_metadata")) {
                int prevCheckpointId;
                int curCheckpointId = UnalignedCheckpointStressITCase.getCheckpointNumberFromPath(path.getParent());
                int n = prevCheckpointId = this.maxCheckpointDir == null ? -1 : UnalignedCheckpointStressITCase.getCheckpointNumberFromPath(this.maxCheckpointDir);
                if (prevCheckpointId < curCheckpointId) {
                    this.maxCheckpointDir = path.getParent();
                }
            }
            return FileVisitResult.CONTINUE;
        }

        @Override
        public FileVisitResult visitFileFailed(Path file, IOException ex) throws IOException {
            if (ex instanceof NoSuchFileException) {
                return FileVisitResult.CONTINUE;
            }
            throw ex;
        }

        public Path getMaxCheckpointDir() {
            return this.maxCheckpointDir;
        }
    }

    private static class ThrottlingMap
    implements MapFunction<Record, Record> {
        private final int chance;

        public ThrottlingMap(int chance) {
            this.chance = chance;
        }

        public Record map(Record value) throws Exception {
            if (ThreadLocalRandom.current().nextInt(this.chance) == 0) {
                Thread.sleep(1L);
            }
            return value.validate();
        }
    }

    public static class Record
    implements Serializable {
        public int sourceId;
        public long value;
        public byte[] payload;

        public Record(int sourceId, long value, int payloadSize) {
            this.sourceId = sourceId;
            this.payload = new byte[payloadSize];
            this.value = value;
            for (int i = 0; i < this.payload.length; ++i) {
                this.payload[i] = this.payloadAt(i);
            }
        }

        public int getSourceId() {
            return this.sourceId;
        }

        public long getValue() {
            return this.value;
        }

        public Record validate() {
            for (int i = 0; i < this.payload.length; ++i) {
                Preconditions.checkState((this.payload[i] == this.payloadAt(i) ? 1 : 0) != 0, (String)"Expected %s at position %s, but found %s in %s", (Object)this.payloadAt(i), (Object)i, (Object)this.payload[i], (Object)this);
            }
            return this;
        }

        private byte payloadAt(int index) {
            return (byte)((this.value + (long)index) % 128L);
        }

        public String toString() {
            return String.format("Record(sourceId=%d, payload.length=%d, value=%d)", this.sourceId, this.payload.length, this.value);
        }
    }

    private static class FailingMapper
    implements MapFunction<Record, Record>,
    CheckpointListener {
        @Nullable
        private Long firstCompletedCheckpoint;
        @Nullable
        private Record lastProcessedRecord;
        private final int completedCheckpointsBeforeFailure = ThreadLocalRandom.current().nextInt(2, 11);

        private FailingMapper() {
        }

        public Record map(Record value) throws Exception {
            this.lastProcessedRecord = value;
            return value;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.firstCompletedCheckpoint == null) {
                this.firstCompletedCheckpoint = checkpointId;
            }
            if ((long)this.completedCheckpointsBeforeFailure <= checkpointId - this.firstCompletedCheckpoint) {
                throw new ExpectedTestException(this.lastProcessedRecord == null ? "no record" : this.lastProcessedRecord.toString());
            }
        }
    }

    private static class ReEmitAll
    extends ProcessWindowFunction<Record, Record, Integer, TimeWindow> {
        private ReEmitAll() {
        }

        public void process(Integer integer, ProcessWindowFunction.Context context, Iterable<Record> elements, Collector<Record> out) {
            for (Record element : elements) {
                out.collect((Object)element);
            }
        }
    }

    private static class LegacySourceFunction
    extends AbstractRichFunction
    implements ParallelSourceFunction<Record>,
    CheckpointedFunction {
        private final int sourceIdOffset;
        private long nextValue;
        private ListState<Long> nextState;
        private volatile boolean running = true;

        public LegacySourceFunction(int sourceIdOffset) {
            this.sourceIdOffset = sourceIdOffset;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Record> ctx) throws Exception {
            RecordGenerator generator = new RecordGenerator(this.getRuntimeContext().getIndexOfThisSubtask() + this.sourceIdOffset);
            while (this.running) {
                Record next = generator.next(this.nextValue);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ++this.nextValue;
                    ctx.collect((Object)next);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.nextState.clear();
            this.nextState.add((Object)this.nextValue);
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.nextState = context.getOperatorStateStore().getListState(new ListStateDescriptor("state", Long.class));
            this.nextValue = (Long)Objects.requireNonNull(Iterables.getOnlyElement((Iterable)((Iterable)this.nextState.get()), (Object)0L));
        }
    }

    private static enum SpeedMode {
        SLOW{

            @Override
            public int getSleep() {
                return ThreadLocalRandom.current().nextInt(10);
            }
        }
        ,
        NORMAL{

            @Override
            public int getSleep() {
                return ThreadLocalRandom.current().nextInt(2);
            }
        }
        ,
        FAST{

            @Override
            public int getSleep() {
                return ThreadLocalRandom.current().nextInt(10) == 0 ? 1 : 0;
            }
        }
        ,
        BURST{

            @Override
            public int getSleep() {
                int burstChance = 1000;
                return ThreadLocalRandom.current().nextInt(burstChance) == 0 ? burstChance * 1 : 0;
            }
        };


        public abstract int getSleep();

        public static SpeedMode valueOf(int n) {
            Preconditions.checkState((n >= 0 ? 1 : 0) != 0);
            return SpeedMode.values()[n % SpeedMode.values().length];
        }
    }

    private static enum SizeMode {
        SMALL{

            @Override
            public int getSize() {
                return 1024;
            }
        }
        ,
        LARGE{

            @Override
            public int getSize() {
                return 4096;
            }
        }
        ,
        RANDOM{

            @Override
            public int getSize() {
                return ThreadLocalRandom.current().nextInt(4) * 1024 + 1024;
            }
        };


        public static SizeMode valueOf(int n) {
            Preconditions.checkState((n >= 0 ? 1 : 0) != 0);
            return SizeMode.values()[n % SizeMode.values().length];
        }

        public abstract int getSize();
    }

    private static class RecordGenerator {
        private final int sourceId;
        private final SizeMode sizeMode;
        private final SpeedMode speedMode;

        public RecordGenerator(int sourceId) {
            this.sourceId = sourceId;
            this.sizeMode = SizeMode.valueOf(sourceId);
            this.speedMode = SpeedMode.valueOf(sourceId);
        }

        public Record next(long value) throws InterruptedException {
            int sleep = this.speedMode.getSleep();
            if (sleep > 0) {
                Thread.sleep(sleep);
            }
            return new Record(this.sourceId, value, this.sizeMode.getSize());
        }
    }
}

