/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.windowing;

import java.io.Serializable;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

public class GroupedProcessingTimeWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource stream = env.addSource((SourceFunction)new DataSource());
        stream.keyBy((KeySelector & Serializable)value -> (Long)value.f0).window((WindowAssigner)SlidingProcessingTimeWindows.of((Time)Time.milliseconds((long)2500L), (Time)Time.milliseconds((long)500L))).reduce((ReduceFunction)new SummingReducer()).addSink((SinkFunction)new DiscardingSink());
        env.execute();
    }

    private static class DataSource
    extends RichParallelSourceFunction<Tuple2<Long, Long>> {
        private volatile boolean running = true;

        private DataSource() {
        }

        public void run(SourceFunction.SourceContext<Tuple2<Long, Long>> ctx) throws Exception {
            long startTime = System.currentTimeMillis();
            long numElements = 20000000L;
            long numKeys = 10000L;
            long val = 1L;
            for (long count = 0L; this.running && count < 20000000L; ++count) {
                ctx.collect((Object)new Tuple2((Object)val++, (Object)1L));
                if (val <= 10000L) continue;
                val = 1L;
            }
            long endTime = System.currentTimeMillis();
            System.out.println("Took " + (endTime - startTime) + " msecs for " + 20000000L + " values");
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static class SummingReducer
    implements ReduceFunction<Tuple2<Long, Long>> {
        private SummingReducer() {
        }

        public Tuple2<Long, Long> reduce(Tuple2<Long, Long> value1, Tuple2<Long, Long> value2) {
            return new Tuple2(value1.f0, (Object)((Long)value1.f1 + (Long)value2.f1));
        }
    }

    private static class SummingWindowFunction
    implements WindowFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Long, Window> {
        private SummingWindowFunction() {
        }

        public void apply(Long key, Window window, Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) {
            long sum = 0L;
            for (Tuple2<Long, Long> value : values) {
                sum += ((Long)value.f1).longValue();
            }
            out.collect((Object)new Tuple2((Object)key, (Object)sum));
        }
    }

    private static class FirstFieldKeyExtractor<Type extends Tuple, Key>
    implements KeySelector<Type, Key> {
        private FirstFieldKeyExtractor() {
        }

        public Key getKey(Type value) {
            return (Key)value.getField(0);
        }
    }
}

