/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.sideoutput;

import java.io.Serializable;
import java.time.Duration;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.examples.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.ParameterTool;

public class SideOutputExample {
    private static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected"){};

    public static void main(String[] args) throws Exception {
        SingleOutputStreamOperator textWithTimestampAndWatermark;
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)params);
        if (params.has("input")) {
            FileSource fileSource = FileSource.forRecordStreamFormat((StreamFormat)new TextLineInputFormat(), (Path[])new Path[]{new Path(params.get("input"))}).build();
            textWithTimestampAndWatermark = env.fromSource((Source)fileSource, IngestionTimeWatermarkStrategy.create(), "Words Source");
        } else {
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            DataStreamSource text = env.fromData((Object[])WordCountData.WORDS);
            textWithTimestampAndWatermark = text.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create());
        }
        boolean asyncState = params.has("async-state");
        SingleOutputStreamOperator tokenized = textWithTimestampAndWatermark.process((ProcessFunction)new Tokenizer());
        SingleOutputStreamOperator rejectedWords = tokenized.getSideOutput(rejectedWordsTag).map((MapFunction & Serializable)value -> "rejected: " + value, Types.STRING);
        KeyedStream keyedTokenized = tokenized.keyBy((KeySelector & Serializable)value -> (String)value.f0);
        if (asyncState) {
            keyedTokenized.enableAsyncState();
        }
        SingleOutputStreamOperator counts = keyedTokenized.window((WindowAssigner)TumblingEventTimeWindows.of((Duration)Duration.ofSeconds(5L))).sum(1);
        if (params.has("output")) {
            counts.sinkTo((Sink)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(params.get("output")), (Encoder)new SimpleStringEncoder()).withRollingPolicy((RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes((long)1L)).withRolloverInterval(Duration.ofSeconds(10L)).build())).build()).name("output");
            rejectedWords.sinkTo((Sink)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(params.get("rejected-words-output")), (Encoder)new SimpleStringEncoder()).withRollingPolicy((RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes((long)1L)).withRolloverInterval(Duration.ofSeconds(10L)).build())).build()).name("rejected-words-output");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
            rejectedWords.print();
        }
        env.execute("Streaming WordCount SideOutput");
    }

    private static class IngestionTimeWatermarkStrategy<T>
    implements WatermarkStrategy<T> {
        private IngestionTimeWatermarkStrategy() {
        }

        public static <T> IngestionTimeWatermarkStrategy<T> create() {
            return new IngestionTimeWatermarkStrategy<T>();
        }

        public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new AscendingTimestampsWatermarks();
        }

        public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (event, timestamp) -> System.currentTimeMillis();
        }
    }

    public static final class Tokenizer
    extends ProcessFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        public void processElement(String value, ProcessFunction.Context ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens;
            for (String token : tokens = value.toLowerCase().split("\\W+")) {
                if (token.length() > 5) {
                    ctx.output(rejectedWordsTag, (Object)token);
                    continue;
                }
                if (token.length() <= 0) continue;
                out.collect((Object)new Tuple2((Object)token, (Object)1));
            }
        }
    }
}

