/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.sink.FileSinkITBase;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BatchExecutionFileSinkITCase
extends FileSinkITBase {
    @Override
    protected JobGraph createJobGraph(String path) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        env.configure((ReadableConfig)config, ((Object)((Object)this)).getClass().getClassLoader());
        if (this.triggerFailover) {
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (Time)Time.milliseconds((long)100L)));
        } else {
            env.setRestartStrategy(RestartStrategies.noRestart());
        }
        StreamSource sourceOperator = new StreamSource((SourceFunction)new BatchExecutionTestSource(10000));
        DataStreamSource source = new DataStreamSource(env, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, sourceOperator, true, "Source", Boundedness.BOUNDED);
        source.setParallelism(4).rebalance().map((MapFunction)new BatchExecutionOnceFailingMap(10000, this.triggerFailover)).setParallelism(3).sinkTo(this.createFileSink(path)).setParallelism(3);
        StreamGraph streamGraph = env.getStreamGraph();
        return streamGraph.getJobGraph();
    }

    private static class BatchExecutionOnceFailingMap
    extends RichMapFunction<Integer, Integer> {
        private final int maxNumber;
        private final boolean triggerFailover;

        public BatchExecutionOnceFailingMap(int maxNumber, boolean triggerFailover) {
            this.maxNumber = maxNumber;
            this.triggerFailover = triggerFailover;
        }

        public Integer map(Integer value) {
            if (this.triggerFailover && this.getRuntimeContext().getIndexOfThisSubtask() == 0 && this.getRuntimeContext().getAttemptNumber() == 0 && (double)value.intValue() >= 0.4 * (double)this.maxNumber) {
                throw new RuntimeException("Designated Failure");
            }
            return value;
        }
    }

    private static class BatchExecutionTestSource
    extends RichParallelSourceFunction<Integer> {
        private final int numberOfRecords;
        private volatile boolean isCanceled;

        public BatchExecutionTestSource(int numberOfRecords) {
            this.numberOfRecords = numberOfRecords;
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            for (int i = 0; !this.isCanceled && i < this.numberOfRecords; ++i) {
                ctx.collect((Object)i);
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }
}

