package org.apache.flink.test.streaming.runtime;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.UnsupportedTimeCharacteristicException;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/test/streaming/runtime/IntervalJoinITCase.class */
public class IntervalJoinITCase {
    private static List<String> testResults;

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IntervalJoinITCase$AscendingTuple2TimestampExtractor.class */
    private static class AscendingTuple2TimestampExtractor extends AscendingTimestampExtractor<Tuple2<String, Integer>> {
        private AscendingTuple2TimestampExtractor() {
        }

        public long extractAscendingTimestamp(Tuple2<String, Integer> tuple2) {
            return ((Integer) tuple2.f1).intValue();
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IntervalJoinITCase$CombineToStringJoinFunction.class */
    private static class CombineToStringJoinFunction extends ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String> {
        private CombineToStringJoinFunction() {
        }

        public void processElement(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22, ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context context, Collector<String> collector) {
            collector.collect(tuple2 + ":" + tuple22);
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
            processElement((Tuple2<String, Integer>) obj, (Tuple2<String, Integer>) obj2, (ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IntervalJoinITCase$ResultSink.class */
    private static class ResultSink implements SinkFunction<String> {
        private ResultSink() {
        }

        public void invoke(String str, SinkFunction.Context context) throws Exception {
            IntervalJoinITCase.testResults.add(str);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IntervalJoinITCase$SourceConsumer.class */
    private interface SourceConsumer extends Serializable, Consumer<SourceFunction.SourceContext<Tuple2<String, Integer>>> {
        public static final long serialVersionUID = 1;
    }

    /* loaded from: input_file:org/apache/flink/test/streaming/runtime/IntervalJoinITCase$Tuple2KeyExtractor.class */
    private static class Tuple2KeyExtractor implements KeySelector<Tuple2<String, Integer>, String> {
        private Tuple2KeyExtractor() {
        }

        public String getKey(Tuple2<String, Integer> tuple2) throws Exception {
            return (String) tuple2.f0;
        }
    }

    @Before
    public void setup() {
        testResults = new ArrayList();
    }

    @Test
    public void testCanJoinOverSameKey() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2), Tuple2.of("key", 3), Tuple2.of("key", 4), Tuple2.of("key", 5)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2), Tuple2.of("key", 3), Tuple2.of("key", 4), Tuple2.of("key", 5)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(0L), Time.milliseconds(0L)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.IntervalJoinITCase.1
            public void processElement(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22, ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context context, Collector<String> collector) throws Exception {
                collector.collect(tuple2 + ":" + tuple22);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<String, Integer>) obj, (Tuple2<String, Integer>) obj2, (ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context) context, (Collector<String>) collector);
            }
        }).addSink(new ResultSink());
        executionEnvironment.execute();
        expectInAnyOrder("(key,0):(key,0)", "(key,1):(key,1)", "(key,2):(key,2)", "(key,3):(key,3)", "(key,4):(key,4)", "(key,5):(key,5)");
    }

    @Test
    public void testJoinsCorrectlyWithMultipleKeys() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key1", 0), Tuple2.of("key2", 1), Tuple2.of("key1", 2), Tuple2.of("key2", 3), Tuple2.of("key1", 4), Tuple2.of("key2", 5)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key1", 0), Tuple2.of("key2", 1), Tuple2.of("key1", 2), Tuple2.of("key2", 3), Tuple2.of("key1", 4), Tuple2.of("key2", 5)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(0L), Time.milliseconds(1L)).process(new CombineToStringJoinFunction()).addSink(new ResultSink());
        executionEnvironment.execute();
        expectInAnyOrder("(key1,0):(key1,0)", "(key2,1):(key2,1)", "(key1,2):(key1,2)", "(key2,3):(key2,3)", "(key1,4):(key1,4)", "(key2,5):(key2,5)");
    }

    private DataStream<Tuple2<String, Integer>> buildSourceStream(StreamExecutionEnvironment streamExecutionEnvironment, final SourceConsumer sourceConsumer) {
        return streamExecutionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.IntervalJoinITCase.2
            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) {
                sourceConsumer.accept(sourceContext);
            }

            public void cancel() {
            }
        });
    }

    @Test
    public void testBoundedUnorderedStreamsStillJoinCorrectly() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.IntervalJoinITCase.3
            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) {
                sourceContext.collectWithTimestamp(Tuple2.of("key", 5), 5L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 1), 1L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 4), 4L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 3), 3L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 2), 2L);
                sourceContext.emitWatermark(new Watermark(5L));
                sourceContext.collectWithTimestamp(Tuple2.of("key", 9), 9L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 8), 8L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 7), 7L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 6), 6L);
            }

            public void cancel() {
            }
        }).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.addSource(new SourceFunction<Tuple2<String, Integer>>() { // from class: org.apache.flink.test.streaming.runtime.IntervalJoinITCase.4
            public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> sourceContext) {
                sourceContext.collectWithTimestamp(Tuple2.of("key", 2), 2L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 1), 1L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 3), 3L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 4), 4L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 5), 5L);
                sourceContext.emitWatermark(new Watermark(5L));
                sourceContext.collectWithTimestamp(Tuple2.of("key", 8), 8L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 7), 7L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 9), 9L);
                sourceContext.collectWithTimestamp(Tuple2.of("key", 6), 6L);
            }

            public void cancel() {
            }
        }).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(-1L), Time.milliseconds(1L)).process(new CombineToStringJoinFunction()).addSink(new ResultSink());
        executionEnvironment.execute();
        expectInAnyOrder("(key,1):(key,1)", "(key,1):(key,2)", "(key,2):(key,1)", "(key,2):(key,2)", "(key,2):(key,3)", "(key,3):(key,2)", "(key,3):(key,3)", "(key,3):(key,4)", "(key,4):(key,3)", "(key,4):(key,4)", "(key,4):(key,5)", "(key,5):(key,4)", "(key,5):(key,5)", "(key,5):(key,6)", "(key,6):(key,5)", "(key,6):(key,6)", "(key,6):(key,7)", "(key,7):(key,6)", "(key,7):(key,7)", "(key,7):(key,8)", "(key,8):(key,7)", "(key,8):(key,8)", "(key,8):(key,9)", "(key,9):(key,8)", "(key,9):(key,9)");
    }

    @Test(expected = NullPointerException.class)
    public void testFailsWithoutUpperBound() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("1", 1)}).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("1", 1)}).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(0L), (Time) null);
    }

    @Test(expected = NullPointerException.class)
    public void testFailsWithoutLowerBound() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("1", 1)}).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("1", 1)}).keyBy(new Tuple2KeyExtractor())).between((Time) null, Time.milliseconds(1L));
    }

    @Test
    public void testBoundsCanBeExclusive() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(0L), Time.milliseconds(2L)).upperBoundExclusive().lowerBoundExclusive().process(new CombineToStringJoinFunction()).addSink(new ResultSink());
        executionEnvironment.execute();
        expectInAnyOrder("(key,0):(key,1)", "(key,1):(key,2)");
    }

    @Test
    public void testBoundsCanBeInclusive() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(0L), Time.milliseconds(2L)).process(new CombineToStringJoinFunction()).addSink(new ResultSink());
        executionEnvironment.execute();
        expectInAnyOrder("(key,0):(key,0)", "(key,0):(key,1)", "(key,0):(key,2)", "(key,1):(key,1)", "(key,1):(key,2)", "(key,2):(key,2)");
    }

    @Test
    public void testBoundsAreInclusiveByDefault() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("key", 0), Tuple2.of("key", 1), Tuple2.of("key", 2)}).assignTimestampsAndWatermarks(new AscendingTuple2TimestampExtractor()).keyBy(new Tuple2KeyExtractor())).between(Time.milliseconds(0L), Time.milliseconds(2L)).process(new CombineToStringJoinFunction()).addSink(new ResultSink());
        executionEnvironment.execute();
        expectInAnyOrder("(key,0):(key,0)", "(key,0):(key,1)", "(key,0):(key,2)", "(key,1):(key,1)", "(key,1):(key,2)", "(key,2):(key,2)");
    }

    @Test(expected = UnsupportedTimeCharacteristicException.class)
    public void testExecutionFailsInProcessingTime() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);
        executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("1", 1)}).keyBy(new Tuple2KeyExtractor()).intervalJoin(executionEnvironment.fromElements(new Tuple2[]{Tuple2.of("1", 1)}).keyBy(new Tuple2KeyExtractor())).inProcessingTime().between(Time.milliseconds(0L), Time.milliseconds(0L)).process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() { // from class: org.apache.flink.test.streaming.runtime.IntervalJoinITCase.5
            public void processElement(Tuple2<String, Integer> tuple2, Tuple2<String, Integer> tuple22, ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context context, Collector<String> collector) throws Exception {
                collector.collect(tuple2 + ":" + tuple22);
            }

            public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, ProcessJoinFunction.Context context, Collector collector) throws Exception {
                processElement((Tuple2<String, Integer>) obj, (Tuple2<String, Integer>) obj2, (ProcessJoinFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>.Context) context, (Collector<String>) collector);
            }
        });
    }

    private static void expectInAnyOrder(String... strArr) {
        ArrayList newArrayList = Lists.newArrayList(strArr);
        Collections.sort(newArrayList);
        Collections.sort(testResults);
        Assert.assertEquals(newArrayList, testResults);
    }
}
