/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.connector.file.sink.utils.PartSizeAndCheckpointRollingPolicy;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

class FileSinkSpeculativeITCase {
    @RegisterExtension
    private static final MiniClusterExtension miniClusterExtension = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setConfiguration(FileSinkSpeculativeITCase.configure()).build());
    @TempDir
    private java.nio.file.Path tmpDir;
    private static final int NUM_SOURCES = 4;
    private static final int NUM_SINKS = 3;
    private static final int NUM_RECORDS = 10000;
    private static final int NUM_BUCKETS = 4;
    private static final AtomicInteger slowTaskCounter = new AtomicInteger(1);

    FileSinkSpeculativeITCase() {
    }

    @BeforeEach
    void setUp() {
        slowTaskCounter.set(1);
    }

    @Test
    void testFileSinkSpeculative() throws Exception {
        String path = this.tmpDir.toString();
        this.executeJobWithSlowSink(path);
        IntegerFileSinkTestDataUtils.checkIntegerSequenceSinkOutput(path, 10000, 4, 4);
    }

    private void executeJobWithSlowSink(String path) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        StreamSource sourceOperator = new StreamSource((SourceFunction)new BatchExecutionTestSource(10000));
        DataStreamSource source = new DataStreamSource(env, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, sourceOperator, true, "Source", Boundedness.BOUNDED);
        DataStreamSink sink = source.setParallelism(4).rebalance().map((MapFunction)new TestingMap()).name("test_map").setParallelism(3).sinkTo(this.createFileSink(path)).name("file_sink").setParallelism(3);
        JobGraph jobGraph = env.getStreamGraph(false).getJobGraph();
        for (JobVertex jobVertex : jobGraph.getVertices()) {
            if (!jobVertex.getName().contains("test_map")) continue;
            Assertions.assertThat((boolean)jobVertex.getName().contains("file_sink")).isTrue();
        }
        JobClient client = env.executeAsync("FileSinkSpeculativeITCase");
        client.getJobExecutionResult().get();
    }

    private FileSink<Integer> createFileSink(String path) {
        return ((FileSink.DefaultRowFormatBuilder)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(path), (Encoder)new IntegerFileSinkTestDataUtils.IntEncoder()).withBucketAssigner((BucketAssigner)new IntegerFileSinkTestDataUtils.ModuloBucketAssigner(4))).withRollingPolicy(new PartSizeAndCheckpointRollingPolicy(1024L, false))).build();
    }

    private static Configuration configure() {
        Configuration configuration = new Configuration();
        configuration.set(RestOptions.BIND_PORT, (Object)"0");
        configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, (Object)Duration.ofMillis(5000L));
        configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, (Object)true);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, (Object)1.0);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, (Object)0.2);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, (Object)Duration.ofMillis(0L));
        configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, (Object)Duration.ofMillis(0L));
        return configuration;
    }

    private static void maybeSleep() {
        if (slowTaskCounter.getAndDecrement() > 0) {
            try {
                Thread.sleep(Integer.MAX_VALUE);
            }
            catch (Exception e) {
                throw new RuntimeException();
            }
        }
    }

    private static class TestingMap
    extends RichMapFunction<Integer, Integer> {
        private TestingMap() {
        }

        public Integer map(Integer value) throws Exception {
            FileSinkSpeculativeITCase.maybeSleep();
            return value;
        }
    }

    private static class BatchExecutionTestSource
    extends RichParallelSourceFunction<Integer> {
        private final int numberOfRecords;
        private volatile boolean isCanceled;

        public BatchExecutionTestSource(int numberOfRecords) {
            this.numberOfRecords = numberOfRecords;
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            for (int i = 0; !this.isCanceled && i < this.numberOfRecords; ++i) {
                ctx.collect((Object)i);
            }
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }
}

