package org.apache.flink.asyncprocessing.operators;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
import org.apache.flink.runtime.asyncprocessing.declare.NamedCallback;
import org.apache.flink.runtime.asyncprocessing.declare.NamedConsumer;
import org.apache.flink.runtime.asyncprocessing.declare.NamedFunction;
import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest.class */
public class AsyncKeyedProcessOperatorTest {

    /* loaded from: input_file:org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest$TestChainDeclarationFunction.class */
    private static class TestChainDeclarationFunction extends TestDeclarationFunctionBase {
        private TestChainDeclarationFunction() {
        }

        public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(DeclarationContext declarationContext, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context context, Collector<String> collector) throws DeclarationException {
            ContextVariable declareVariable = declarationContext.declareVariable((Supplier) null);
            return declarationContext.declareChain().thenCompose(tuple2 -> {
                if (declareVariable.get() == null) {
                    declareVariable.set((Integer) tuple2.f0);
                }
                this.value.addAndGet(((Integer) tuple2.f0).intValue());
                return StateFutureUtils.completedVoidFuture();
            }).thenCompose(obj -> {
                return StateFutureUtils.completedFuture(Integer.valueOf(this.value.incrementAndGet()));
            }).withName("adder").thenAccept(num -> {
                this.value.addAndGet(((Integer) declareVariable.get()).intValue());
                collector.collect(String.valueOf(this.value.get()));
            }).withName("doubler").finish();
        }
    }

    /* loaded from: input_file:org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest$TestDeclarationFunctionBase.class */
    private static abstract class TestDeclarationFunctionBase extends DeclaringAsyncKeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
        final AtomicInteger value = new AtomicInteger(0);

        private TestDeclarationFunctionBase() {
        }

        public int getValue() {
            return this.value.get();
        }
    }

    /* loaded from: input_file:org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest$TestNormalDeclarationFunction.class */
    private static class TestNormalDeclarationFunction extends TestDeclarationFunctionBase {
        private TestNormalDeclarationFunction() {
        }

        public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(DeclarationContext declarationContext, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context context, Collector<String> collector) throws DeclarationException {
            ContextVariable declareVariable = declarationContext.declareVariable((Supplier) null);
            NamedFunction declare = declarationContext.declare("adder", r3 -> {
                return StateFutureUtils.completedFuture(Integer.valueOf(this.value.incrementAndGet()));
            });
            NamedConsumer declare2 = declarationContext.declare("doubler", num -> {
                this.value.addAndGet(((Integer) declareVariable.get()).intValue());
                collector.collect(String.valueOf(this.value.get()));
            });
            Assertions.assertThat(declare).isInstanceOf(NamedCallback.class);
            Assertions.assertThat(declare2).isInstanceOf(NamedCallback.class);
            return tuple2 -> {
                if (declareVariable.get() == null) {
                    declareVariable.set((Integer) tuple2.f0);
                }
                this.value.addAndGet(((Integer) tuple2.f0).intValue());
                StateFutureUtils.completedVoidFuture().thenCompose(declare).thenAccept(declare2);
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest$TestNotDeclarationFunction.class */
    private static class TestNotDeclarationFunction extends KeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
        final AtomicInteger value = new AtomicInteger(0);

        private TestNotDeclarationFunction() {
        }

        public int getValue() {
            return this.value.get();
        }

        public void processElement(Tuple2<Integer, String> tuple2, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context context, Collector<String> collector) throws Exception {
            context.timerService().registerEventTimeTimer(((Integer) tuple2.f0).intValue());
        }

        public void onTimer(long j, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.OnTimerContext onTimerContext, Collector<String> collector) throws DeclarationException {
            this.value.addAndGet((int) j);
            this.value.incrementAndGet();
            this.value.addAndGet((int) j);
            collector.collect(String.valueOf(this.value.get()));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((Tuple2<Integer, String>) obj, (KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context) context, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest$TestTimerDeclarationFunction.class */
    private static class TestTimerDeclarationFunction extends TestDeclarationFunctionBase {
        private TestTimerDeclarationFunction() {
        }

        public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(DeclarationContext declarationContext, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.Context context, Collector<String> collector) throws DeclarationException {
            return declarationContext.declareChain().thenCompose(tuple2 -> {
                context.timerService().registerEventTimeTimer(((Integer) tuple2.f0).intValue());
                return StateFutureUtils.completedVoidFuture();
            }).finish();
        }

        public ThrowingConsumer<Long, Exception> declareOnTimer(DeclarationContext declarationContext, KeyedProcessFunction<Integer, Tuple2<Integer, String>, String>.OnTimerContext onTimerContext, Collector<String> collector) throws DeclarationException {
            ContextVariable declareVariable = declarationContext.declareVariable((Supplier) null);
            return declarationContext.declareChain().thenCompose(l -> {
                if (declareVariable.get() == null) {
                    declareVariable.set(Integer.valueOf(l.intValue()));
                }
                this.value.addAndGet(l.intValue());
                return StateFutureUtils.completedVoidFuture();
            }).thenCompose(obj -> {
                return StateFutureUtils.completedFuture(Integer.valueOf(this.value.incrementAndGet()));
            }).withName("adder").thenAccept(num -> {
                this.value.addAndGet(((Integer) declareVariable.get()).intValue());
                collector.collect(String.valueOf(this.value.get()));
            }).withName("doubler").finish();
        }
    }

    @ValueSource(booleans = {false, true})
    @ParameterizedTest(name = "Chain declaration = {0}")
    public void testNormalProcessor(boolean z) throws Exception {
        TestDeclarationFunctionBase testChainDeclarationFunction = z ? new TestChainDeclarationFunction() : new TestNormalDeclarationFunction();
        AsyncKeyedProcessOperator asyncKeyedProcessOperator = new AsyncKeyedProcessOperator(testChainDeclarationFunction);
        ArrayList arrayList = new ArrayList();
        AsyncKeyedOneInputStreamOperatorTestHarness create = AsyncKeyedOneInputStreamOperatorTestHarness.create(asyncKeyedProcessOperator, tuple2 -> {
            return (Integer) tuple2.f0;
        }, TypeInformation.of(Integer.class));
        try {
            create.open();
            create.processElement(new StreamRecord(Tuple2.of(5, "5")));
            arrayList.add(new StreamRecord("11"));
            Assertions.assertThat(testChainDeclarationFunction.getValue()).isEqualTo(11);
            create.processElement(new StreamRecord(Tuple2.of(6, "6")));
            arrayList.add(new StreamRecord("24"));
            Assertions.assertThat(testChainDeclarationFunction.getValue()).isEqualTo(24);
            Assertions.assertThat(create.getOutput()).containsExactly(arrayList.toArray());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTimerProcessor() throws Exception {
        TestTimerDeclarationFunction testTimerDeclarationFunction = new TestTimerDeclarationFunction();
        AsyncKeyedProcessOperator asyncKeyedProcessOperator = new AsyncKeyedProcessOperator(testTimerDeclarationFunction);
        ArrayList arrayList = new ArrayList();
        AsyncKeyedOneInputStreamOperatorTestHarness create = AsyncKeyedOneInputStreamOperatorTestHarness.create(asyncKeyedProcessOperator, tuple2 -> {
            return (Integer) tuple2.f0;
        }, TypeInformation.of(Integer.class));
        try {
            create.open();
            create.processElement(new StreamRecord(Tuple2.of(5, "5")));
            create.processElement(new StreamRecord(Tuple2.of(6, "5")));
            Assertions.assertThat(testTimerDeclarationFunction.getValue()).isEqualTo(0);
            create.processWatermark(5L);
            arrayList.add(new StreamRecord("11", 5L));
            arrayList.add(new Watermark(5L));
            Assertions.assertThat(testTimerDeclarationFunction.getValue()).isEqualTo(11);
            create.processWatermark(6L);
            arrayList.add(new StreamRecord("24", 6L));
            arrayList.add(new Watermark(6L));
            Assertions.assertThat(testTimerDeclarationFunction.getValue()).isEqualTo(24);
            Assertions.assertThat(create.getOutput()).containsExactly(arrayList.toArray());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testNoDeclaredFunction() throws Exception {
        TestNotDeclarationFunction testNotDeclarationFunction = new TestNotDeclarationFunction();
        AsyncKeyedProcessOperator asyncKeyedProcessOperator = new AsyncKeyedProcessOperator(testNotDeclarationFunction);
        ArrayList arrayList = new ArrayList();
        AsyncKeyedOneInputStreamOperatorTestHarness create = AsyncKeyedOneInputStreamOperatorTestHarness.create(asyncKeyedProcessOperator, tuple2 -> {
            return (Integer) tuple2.f0;
        }, TypeInformation.of(Integer.class));
        try {
            create.open();
            create.processElement(new StreamRecord(Tuple2.of(5, "5")));
            create.processElement(new StreamRecord(Tuple2.of(6, "5")));
            Assertions.assertThat(testNotDeclarationFunction.getValue()).isEqualTo(0);
            create.processWatermark(5L);
            arrayList.add(new StreamRecord("11", 5L));
            arrayList.add(new Watermark(5L));
            Assertions.assertThat(testNotDeclarationFunction.getValue()).isEqualTo(11);
            create.processWatermark(6L);
            arrayList.add(new StreamRecord("24", 6L));
            arrayList.add(new Watermark(6L));
            Assertions.assertThat(testNotDeclarationFunction.getValue()).isEqualTo(24);
            Assertions.assertThat(create.getOutput()).containsExactly(arrayList.toArray());
            if (create != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (create != null) {
                try {
                    create.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1775791994:
                if (implMethodName.equals("lambda$testNoDeclaredFunction$aeea360d$1")) {
                    z = false;
                    break;
                }
                break;
            case 603216137:
                if (implMethodName.equals("lambda$testNormalProcessor$83c7b7c3$1")) {
                    z = 2;
                    break;
                }
                break;
            case 813607948:
                if (implMethodName.equals("lambda$testTimerProcessor$aeea360d$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple2 -> {
                        return (Integer) tuple2.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple22 -> {
                        return (Integer) tuple22.f0;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/asyncprocessing/operators/AsyncKeyedProcessOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/api/java/tuple/Tuple2;)Ljava/lang/Integer;")) {
                    return tuple23 -> {
                        return (Integer) tuple23.f0;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
