/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.join;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.examples.join.WindowJoinSampleData;

public class WindowJoin {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        long windowSize = params.getLong("windowSize", 2000L);
        long rate = params.getLong("rate", 3L);
        System.out.println("Using windowSize=" + windowSize + ", data rate=" + rate);
        System.out.println("To customize example, use: WindowJoin [--windowSize <window-size-in-millis>] [--rate <elements-per-second>]");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.getConfig().setGlobalJobParameters((ExecutionConfig.GlobalJobParameters)params);
        DataStream<Tuple2<String, Integer>> grades = WindowJoinSampleData.GradeSource.getSource(env, rate);
        DataStream<Tuple2<String, Integer>> salaries = WindowJoinSampleData.SalarySource.getSource(env, rate);
        DataStream<Tuple3<String, Integer, Integer>> joinedStream = WindowJoin.runWindowJoin(grades, salaries, windowSize);
        joinedStream.print().setParallelism(1);
        env.execute("Windowed Join Example");
    }

    public static DataStream<Tuple3<String, Integer, Integer>> runWindowJoin(DataStream<Tuple2<String, Integer>> grades, DataStream<Tuple2<String, Integer>> salaries, long windowSize) {
        return grades.join(salaries).where((KeySelector)new NameKeySelector()).equalTo((KeySelector)new NameKeySelector()).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.milliseconds((long)windowSize))).apply((JoinFunction)new JoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple3<String, Integer, Integer>>(){

            public Tuple3<String, Integer, Integer> join(Tuple2<String, Integer> first, Tuple2<String, Integer> second) {
                return new Tuple3(first.f0, first.f1, second.f1);
            }
        });
    }

    private static class NameKeySelector
    implements KeySelector<Tuple2<String, Integer>, String> {
        private NameKeySelector() {
        }

        public String getKey(Tuple2<String, Integer> value) {
            return (String)value.f0;
        }
    }
}

