/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FileSinkMigrationITCase
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final String SOURCE_UID = "source";
    private static final String SINK_UID = "sink";
    private static final int NUM_SOURCES = 4;
    private static final int NUM_SINKS = 3;
    private static final int NUM_RECORDS = 10000;
    private static final int NUM_BUCKETS = 4;
    private static final Map<String, CountDownLatch> SAVEPOINT_LATCH_MAP = new ConcurrentHashMap<String, CountDownLatch>();
    private static final Map<String, CountDownLatch> FINAL_CHECKPOINT_LATCH_MAP = new ConcurrentHashMap<String, CountDownLatch>();
    private String latchId;

    @Before
    public void setup() {
        this.latchId = UUID.randomUUID().toString();
        SAVEPOINT_LATCH_MAP.put(this.latchId, new CountDownLatch(4));
        FINAL_CHECKPOINT_LATCH_MAP.put(this.latchId, new CountDownLatch(8));
    }

    @After
    public void teardown() {
        SAVEPOINT_LATCH_MAP.remove(this.latchId);
        FINAL_CHECKPOINT_LATCH_MAP.remove(this.latchId);
    }

    @Test
    public void testMigration() throws Exception {
        String outputPath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        String savepointBasePath = TEMPORARY_FOLDER.newFolder().getAbsolutePath();
        Configuration config = new Configuration();
        config.setString(RestOptions.BIND_PORT, "18081-19000");
        MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder().setNumTaskManagers(1).setNumSlotsPerTaskManager(4).setConfiguration(config).build();
        JobGraph streamingFileSinkJobGraph = this.createStreamingFileSinkJobGraph(outputPath);
        String savepointPath = this.executeAndTakeSavepoint(cfg, streamingFileSinkJobGraph, savepointBasePath);
        JobGraph fileSinkJobGraph = this.createFileSinkJobGraph(outputPath);
        this.loadSavepointAndExecute(cfg, fileSinkJobGraph, savepointPath);
        IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(outputPath, 10000, 4, 4);
    }

    private JobGraph createStreamingFileSinkJobGraph(String outputPath) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        StreamingFileSink sink = ((StreamingFileSink.DefaultRowFormatBuilder)((StreamingFileSink.DefaultRowFormatBuilder)StreamingFileSink.forRowFormat((Path)new Path(outputPath), (Encoder)new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner((BucketAssigner)new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4))).withRollingPolicy((RollingPolicy)OnCheckpointRollingPolicy.build())).build();
        env.addSource((SourceFunction)new StatefulSource(true, this.latchId)).uid(SOURCE_UID).setParallelism(4).addSink((SinkFunction)sink).setParallelism(3).uid(SINK_UID);
        return env.getStreamGraph().getJobGraph();
    }

    private JobGraph createFileSinkJobGraph(String outputPath) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        FileSink sink = ((FileSink.DefaultRowFormatBuilder)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(outputPath), (Encoder)new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner((BucketAssigner)new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4))).withRollingPolicy((RollingPolicy)OnCheckpointRollingPolicy.build())).build();
        env.addSource((SourceFunction)new StatefulSource(false, this.latchId)).uid(SOURCE_UID).setParallelism(4).sinkTo((Sink)sink).setParallelism(3).uid(SINK_UID);
        return env.getStreamGraph().getJobGraph();
    }

    private String executeAndTakeSavepoint(MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointBasePath) throws Exception {
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            CompletableFuture jobSubmissionResultFuture = miniCluster.submitJob(jobGraph);
            JobID jobId = ((JobSubmissionResult)jobSubmissionResultFuture.get()).getJobID();
            CountDownLatch latch = SAVEPOINT_LATCH_MAP.get(this.latchId);
            latch.await();
            CompletableFuture savepointResultFuture = miniCluster.triggerSavepoint(jobId, savepointBasePath, true);
            String string = (String)savepointResultFuture.get();
            return string;
        }
    }

    private void loadSavepointAndExecute(MiniClusterConfiguration cfg, JobGraph jobGraph, String savepointPath) throws Exception {
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false));
        try (MiniCluster miniCluster = new MiniCluster(cfg);){
            miniCluster.start();
            miniCluster.executeJobBlocking(jobGraph);
        }
    }

    private static class StatefulSource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointedFunction,
    CheckpointListener {
        private final boolean takingSavepointMode;
        private final String latchId;
        private ListState<Integer> nextValueState;
        private int nextValue;
        private volatile boolean snapshottedAfterAllRecordsOutput;
        private volatile boolean isWaitingCheckpointComplete;
        private volatile boolean isCanceled;

        public StatefulSource(boolean takingSavepointMode, String latchId) {
            this.takingSavepointMode = takingSavepointMode;
            this.latchId = latchId;
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.nextValueState = context.getOperatorStateStore().getListState(new ListStateDescriptor("nextValue", Integer.class));
            if (this.nextValueState.get() != null && ((Iterable)this.nextValueState.get()).iterator().hasNext()) {
                this.nextValue = (Integer)((Iterable)this.nextValueState.get()).iterator().next();
            }
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            if (this.takingSavepointMode) {
                this.sendRecordsUntil(3333, 0, ctx);
                CountDownLatch latch = (CountDownLatch)SAVEPOINT_LATCH_MAP.get(this.latchId);
                latch.countDown();
                this.sendRecordsUntil(5000, 100, ctx);
                while (true) {
                    Thread.sleep(5000L);
                }
            }
            this.sendRecordsUntil(10000, 0, ctx);
            this.isWaitingCheckpointComplete = true;
            CountDownLatch latch = (CountDownLatch)FINAL_CHECKPOINT_LATCH_MAP.get(this.latchId);
            latch.await();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void sendRecordsUntil(int targetNumber, int sleepInMillis, SourceFunction.SourceContext<Integer> ctx) throws InterruptedException {
            while (!this.isCanceled && this.nextValue < targetNumber) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.nextValue++);
                }
                if (sleepInMillis <= 0) continue;
                Thread.sleep(sleepInMillis);
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.nextValueState.update(Collections.singletonList(this.nextValue));
            if (this.isWaitingCheckpointComplete) {
                this.snapshottedAfterAllRecordsOutput = true;
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.isWaitingCheckpointComplete && this.snapshottedAfterAllRecordsOutput) {
                CountDownLatch latch = (CountDownLatch)FINAL_CHECKPOINT_LATCH_MAP.get(this.latchId);
                latch.countDown();
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }
}

