/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class IntervalJoinITCase {
    private static List<String> testResults;

    @Before
    public void setup() {
        testResults = new ArrayList<String>();
    }

    @Test
    public void testCanJoinOverSameKey() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KeyedStream streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2), Tuple2.of((Object)"key", (Object)3), Tuple2.of((Object)"key", (Object)4), Tuple2.of((Object)"key", (Object)5)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor()).keyBy((KeySelector)new Tuple2KeyExtractor());
        KeyedStream streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2), Tuple2.of((Object)"key", (Object)3), Tuple2.of((Object)"key", (Object)4), Tuple2.of((Object)"key", (Object)5)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor()).keyBy((KeySelector)new Tuple2KeyExtractor());
        streamOne.intervalJoin(streamTwo).between(Time.milliseconds((long)0L), Time.milliseconds((long)0L)).process((ProcessJoinFunction)new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(){

            public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, ProcessJoinFunction.Context ctx, Collector<String> out) throws Exception {
                out.collect((Object)(left + ":" + right));
            }
        }).addSink((SinkFunction)new ResultSink());
        env.execute();
        IntervalJoinITCase.expectInAnyOrder("(key,0):(key,0)", "(key,1):(key,1)", "(key,2):(key,2)", "(key,3):(key,3)", "(key,4):(key,4)", "(key,5):(key,5)");
    }

    @Test
    public void testJoinsCorrectlyWithMultipleKeys() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        KeyedStream streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key1", (Object)0), Tuple2.of((Object)"key2", (Object)1), Tuple2.of((Object)"key1", (Object)2), Tuple2.of((Object)"key2", (Object)3), Tuple2.of((Object)"key1", (Object)4), Tuple2.of((Object)"key2", (Object)5)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor()).keyBy((KeySelector)new Tuple2KeyExtractor());
        KeyedStream streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key1", (Object)0), Tuple2.of((Object)"key2", (Object)1), Tuple2.of((Object)"key1", (Object)2), Tuple2.of((Object)"key2", (Object)3), Tuple2.of((Object)"key1", (Object)4), Tuple2.of((Object)"key2", (Object)5)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor()).keyBy((KeySelector)new Tuple2KeyExtractor());
        streamOne.intervalJoin(streamTwo).between(Time.milliseconds((long)0L), Time.milliseconds((long)1L)).process((ProcessJoinFunction)new CombineToStringJoinFunction()).addSink((SinkFunction)new ResultSink());
        env.execute();
        IntervalJoinITCase.expectInAnyOrder("(key1,0):(key1,0)", "(key2,1):(key2,1)", "(key1,2):(key1,2)", "(key2,3):(key2,3)", "(key1,4):(key1,4)", "(key2,5):(key2,5)");
    }

    private DataStream<Tuple2<String, Integer>> buildSourceStream(StreamExecutionEnvironment env, final SourceConsumer sourceConsumer) {
        return env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) {
                sourceConsumer.accept(ctx);
            }

            public void cancel() {
            }
        });
    }

    @Test
    public void testBoundedUnorderedStreamsStillJoinCorrectly() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource streamOne = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) {
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)5), 5L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)1), 1L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)4), 4L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)3), 3L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)2), 2L);
                ctx.emitWatermark(new Watermark(5L));
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)9), 9L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)8), 8L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)7), 7L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)6), 6L);
            }

            public void cancel() {
            }
        });
        DataStreamSource streamTwo = env.addSource((SourceFunction)new SourceFunction<Tuple2<String, Integer>>(){

            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> ctx) {
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)2), 2L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)1), 1L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)3), 3L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)4), 4L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)5), 5L);
                ctx.emitWatermark(new Watermark(5L));
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)8), 8L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)7), 7L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)9), 9L);
                ctx.collectWithTimestamp((Object)Tuple2.of((Object)"key", (Object)6), 6L);
            }

            public void cancel() {
            }
        });
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).between(Time.milliseconds((long)-1L), Time.milliseconds((long)1L)).process((ProcessJoinFunction)new CombineToStringJoinFunction()).addSink((SinkFunction)new ResultSink());
        env.execute();
        IntervalJoinITCase.expectInAnyOrder("(key,1):(key,1)", "(key,1):(key,2)", "(key,2):(key,1)", "(key,2):(key,2)", "(key,2):(key,3)", "(key,3):(key,2)", "(key,3):(key,3)", "(key,3):(key,4)", "(key,4):(key,3)", "(key,4):(key,4)", "(key,4):(key,5)", "(key,5):(key,4)", "(key,5):(key,5)", "(key,5):(key,6)", "(key,6):(key,5)", "(key,6):(key,6)", "(key,6):(key,7)", "(key,7):(key,6)", "(key,7):(key,7)", "(key,7):(key,8)", "(key,8):(key,7)", "(key,8):(key,8)", "(key,8):(key,9)", "(key,9):(key,8)", "(key,9):(key,9)");
    }

    @Test(expected=NullPointerException.class)
    public void testFailsWithoutUpperBound() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"1", (Object)1)});
        DataStreamSource streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"1", (Object)1)});
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).between(Time.milliseconds((long)0L), null);
    }

    @Test(expected=NullPointerException.class)
    public void testFailsWithoutLowerBound() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"1", (Object)1)});
        DataStreamSource streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"1", (Object)1)});
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).between(null, Time.milliseconds((long)1L));
    }

    @Test
    public void testBoundsCanBeExclusive() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor());
        SingleOutputStreamOperator streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor());
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).between(Time.milliseconds((long)0L), Time.milliseconds((long)2L)).upperBoundExclusive().lowerBoundExclusive().process((ProcessJoinFunction)new CombineToStringJoinFunction()).addSink((SinkFunction)new ResultSink());
        env.execute();
        IntervalJoinITCase.expectInAnyOrder("(key,0):(key,1)", "(key,1):(key,2)");
    }

    @Test
    public void testBoundsCanBeInclusive() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor());
        SingleOutputStreamOperator streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor());
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).between(Time.milliseconds((long)0L), Time.milliseconds((long)2L)).process((ProcessJoinFunction)new CombineToStringJoinFunction()).addSink((SinkFunction)new ResultSink());
        env.execute();
        IntervalJoinITCase.expectInAnyOrder("(key,0):(key,0)", "(key,0):(key,1)", "(key,0):(key,2)", "(key,1):(key,1)", "(key,1):(key,2)", "(key,2):(key,2)");
    }

    @Test
    public void testBoundsAreInclusiveByDefault() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor());
        SingleOutputStreamOperator streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"key", (Object)0), Tuple2.of((Object)"key", (Object)1), Tuple2.of((Object)"key", (Object)2)}).assignTimestampsAndWatermarks((AssignerWithPeriodicWatermarks)new AscendingTuple2TimestampExtractor());
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).between(Time.milliseconds((long)0L), Time.milliseconds((long)2L)).process((ProcessJoinFunction)new CombineToStringJoinFunction()).addSink((SinkFunction)new ResultSink());
        env.execute();
        IntervalJoinITCase.expectInAnyOrder("(key,0):(key,0)", "(key,0):(key,1)", "(key,0):(key,2)", "(key,1):(key,1)", "(key,1):(key,2)", "(key,2):(key,2)");
    }

    @Test(expected=UnsupportedTimeCharacteristicException.class)
    public void testExecutionFailsInProcessingTime() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource streamOne = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"1", (Object)1)});
        DataStreamSource streamTwo = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)"1", (Object)1)});
        streamOne.keyBy((KeySelector)new Tuple2KeyExtractor()).intervalJoin(streamTwo.keyBy((KeySelector)new Tuple2KeyExtractor())).inProcessingTime().between(Time.milliseconds((long)0L), Time.milliseconds((long)0L)).process((ProcessJoinFunction)new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>(){

            public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, ProcessJoinFunction.Context ctx, Collector<String> out) throws Exception {
                out.collect((Object)(left + ":" + right));
            }
        });
    }

    private static void expectInAnyOrder(String ... expected) {
        ArrayList listExpected = Lists.newArrayList((Object[])expected);
        Collections.sort(listExpected);
        Collections.sort(testResults);
        Assert.assertEquals((Object)listExpected, testResults);
    }

    private static class Tuple2KeyExtractor
    implements KeySelector<Tuple2<String, Integer>, String> {
        private Tuple2KeyExtractor() {
        }

        public String getKey(Tuple2<String, Integer> value) throws Exception {
            return (String)value.f0;
        }
    }

    private static class CombineToStringJoinFunction
    extends ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private CombineToStringJoinFunction() {
        }

        public void processElement(Tuple2<String, Integer> left, Tuple2<String, Integer> right, ProcessJoinFunction.Context ctx, Collector<String> out) {
            out.collect((Object)(left + ":" + right));
        }
    }

    private static class ResultSink
    implements SinkFunction<String> {
        private ResultSink() {
        }

        public void invoke(String value, SinkFunction.Context context) throws Exception {
            testResults.add(value);
        }
    }

    private static class AscendingTuple2TimestampExtractor
    extends AscendingTimestampExtractor<Tuple2<String, Integer>> {
        private AscendingTuple2TimestampExtractor() {
        }

        public long extractAscendingTimestamp(Tuple2<String, Integer> element) {
            return ((Integer)element.f1).intValue();
        }
    }

    private static interface SourceConsumer
    extends Serializable,
    Consumer<SourceFunction.SourceContext<Tuple2<String, Integer>>> {
        public static final long serialVersionUID = 1L;
    }
}

