/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.lookup;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.table.runtime.operators.TableKeyedAsyncWaitOperator;
import org.apache.flink.table.runtime.operators.TableKeyedAsyncWaitOperatorFactory;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.Epoch;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.EpochManager;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.TableAsyncExecutionController;
import org.apache.flink.table.runtime.util.AsyncKeyOrderedTestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.MapAssert;
import org.junit.jupiter.api.Test;

public class TableKeyedAsyncWaitOperatorTest {
    private static final long TIMEOUT = 1000L;
    private static final KeySelector<Integer, Integer> keySelector = (KeySelector & Serializable)value -> value;
    private final AsyncWaitOperatorTest.MyAsyncFunction noLockAsyncDouble2Function = new AsyncWaitOperatorTest.MyAsyncFunction();

    @Test
    void testMultiKeysWithWatermark() throws Exception {
        AsyncWaitOperatorTest.LazyAsyncFunction lazyAsyncFunction = new AsyncWaitOperatorTest.LazyAsyncFunction();
        try (KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(lazyAsyncFunction, 1000L, 10);){
            testHarness.open();
            lazyAsyncFunction.countDown();
            TableKeyedAsyncWaitOperator operator = (TableKeyedAsyncWaitOperator)testHarness.getOperator();
            TableAsyncExecutionController aec = operator.getAsyncExecutionController();
            long initialTime = 0L;
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            testHarness.processElement(new StreamRecord((Object)0, 1L));
            testHarness.processElement(new StreamRecord((Object)1, 2L));
            testHarness.processWatermark(new Watermark(2L));
            Epoch epoch = new Epoch(new Watermark(2L));
            testHarness.processElement(new StreamRecord((Object)0, 3L));
            epoch.incrementCount();
            testHarness.processElement(new StreamRecord((Object)4, 4L));
            epoch.incrementCount();
            testHarness.processElement(new StreamRecord((Object)3, 5L));
            epoch.incrementCount();
            Assertions.assertThat((Object)aec.getActiveEpoch()).isEqualTo((Object)epoch);
            testHarness.processWatermark(new Watermark(5L));
            epoch = new Epoch(new Watermark(5L));
            testHarness.processElement(new StreamRecord((Object)2, 6L));
            epoch.incrementCount();
            testHarness.processElement(new StreamRecord((Object)1, 7L));
            epoch.incrementCount();
            testHarness.processElement(new StreamRecord((Object)0, 8L));
            epoch.incrementCount();
            Assertions.assertThat((Object)aec.getActiveEpoch()).isEqualTo((Object)epoch);
            testHarness.processWatermark(new Watermark(8L));
            testHarness.endInput();
            epoch = new Epoch(new Watermark(8L));
            Assertions.assertThat((Object)aec.getActiveEpoch()).isEqualTo((Object)epoch);
            expectedOutput.add(new StreamRecord((Object)0, 1L));
            expectedOutput.add(new StreamRecord((Object)1, 2L));
            expectedOutput.add(new Watermark(2L));
            expectedOutput.add(new StreamRecord((Object)0, 3L));
            expectedOutput.add(new StreamRecord((Object)4, 4L));
            expectedOutput.add(new StreamRecord((Object)3, 5L));
            expectedOutput.add(new Watermark(5L));
            expectedOutput.add(new StreamRecord((Object)2, 6L));
            expectedOutput.add(new StreamRecord((Object)1, 7L));
            expectedOutput.add(new StreamRecord((Object)0, 8L));
            expectedOutput.add(new Watermark(8L));
            LinkedList<StreamRecord> expected = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)0, 1L), new StreamRecord((Object)0, 3L), new StreamRecord((Object)0, 8L)));
            AsyncKeyOrderedTestUtils.assertKeyOrdered(testHarness.getOutput(), expected);
            expected = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 7L)));
            AsyncKeyOrderedTestUtils.assertKeyOrdered(testHarness.getOutput(), expected);
            TableKeyedAsyncWaitOperatorTest.assertWatermarkEquals("Output watermark was not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testOneKeyWithWatermark() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(this.noLockAsyncDouble2Function, 1000L, 10);){
            long initialTime = 0L;
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)1, 2L));
            testHarness.processWatermark(new Watermark(2L));
            testHarness.processElement(new StreamRecord((Object)1, 3L));
            testHarness.endInput();
            expectedOutput.add(new StreamRecord((Object)2, 1L));
            expectedOutput.add(new StreamRecord((Object)2, 2L));
            expectedOutput.add(new Watermark(2L));
            expectedOutput.add(new StreamRecord((Object)2, 3L));
            TestHarnessUtil.assertOutputEquals((String)"output is not correct.", expectedOutput, (Queue)testHarness.getOutput());
        }
    }

    @Test
    void testKeysProcessingWithTheSameKeysPending() throws Exception {
        AsyncWaitOperatorTest.LazyAsyncFunction lazyAsyncFunction = new AsyncWaitOperatorTest.LazyAsyncFunction();
        try (KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(lazyAsyncFunction, 1000L, 10);){
            long initialTime = 0L;
            ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)2, 1L));
            testHarness.processElement(new StreamRecord((Object)3, 1L));
            testHarness.processElement(new StreamRecord((Object)1, 2L));
            testHarness.processElement(new StreamRecord((Object)2, 2L));
            testHarness.processWatermark(new Watermark(2L));
            TableAsyncExecutionController aec = ((TableKeyedAsyncWaitOperator)testHarness.getOperator()).getAsyncExecutionController();
            Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(2);
            Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(3);
            lazyAsyncFunction.countDown();
            testHarness.endInput();
            Assertions.assertThat((int)aec.getBlockingSize()).isEqualTo(0);
            Assertions.assertThat((int)aec.getInFlightSize()).isEqualTo(0);
            expectedOutput.add(new StreamRecord((Object)1, 1L));
            expectedOutput.add(new StreamRecord((Object)1, 2L));
            AsyncKeyOrderedTestUtils.assertKeyOrdered(testHarness.getOutput(), expectedOutput);
            expectedOutput.clear();
            expectedOutput.add(new StreamRecord((Object)2, 1L));
            expectedOutput.add(new StreamRecord((Object)2, 2L));
            AsyncKeyOrderedTestUtils.assertKeyOrdered(testHarness.getOutput(), expectedOutput);
        }
    }

    @Test
    void testMultiKeysWithWatermarkWithDifferentCapacity() throws Exception {
        for (int capacity = 1; capacity < 10; ++capacity) {
            this.testMultiKeysMixWatermark(capacity);
        }
    }

    private void testMultiKeysMixWatermark(int capacity) throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(this.noLockAsyncDouble2Function, 1000L, capacity);){
            testHarness.open();
            long initialTime = 0L;
            ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)1, 2L));
            testHarness.processWatermark(new Watermark(2L));
            testHarness.processElement(new StreamRecord((Object)2, 3L));
            testHarness.processElement(new StreamRecord((Object)3, 4L));
            testHarness.processElement(new StreamRecord((Object)2, 5L));
            testHarness.processElement(new StreamRecord((Object)3, 6L));
            testHarness.processWatermark(new Watermark(7L));
            testHarness.processWatermark(new Watermark(8L));
            testHarness.processElement(new StreamRecord((Object)2, 9L));
            testHarness.processElement(new StreamRecord((Object)3, 10L));
            testHarness.endInput();
            expectedOutput.add(new StreamRecord((Object)2, 1L));
            expectedOutput.add(new StreamRecord((Object)2, 2L));
            expectedOutput.add(new Watermark(2L));
            expectedOutput.add(new StreamRecord((Object)4, 3L));
            expectedOutput.add(new StreamRecord((Object)6, 4L));
            expectedOutput.add(new StreamRecord((Object)4, 5L));
            expectedOutput.add(new StreamRecord((Object)6, 6L));
            expectedOutput.add(new Watermark(7L));
            expectedOutput.add(new Watermark(8L));
            expectedOutput.add(new StreamRecord((Object)4, 9L));
            expectedOutput.add(new StreamRecord((Object)6, 10L));
            LinkedList<StreamRecord> expected = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)4, 3L), new StreamRecord((Object)4, 5L)));
            AsyncKeyOrderedTestUtils.assertKeyOrdered(testHarness.getOutput(), expected);
            expected = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)6, 4L), new StreamRecord((Object)6, 6L)));
            AsyncKeyOrderedTestUtils.assertKeyOrdered(testHarness.getOutput(), expected);
            TableKeyedAsyncWaitOperatorTest.assertWatermarkEquals("Output watermark is not correct.", expectedOutput, testHarness.getOutput());
        }
    }

    @Test
    void testMultiKeysWithoutWatermark() throws Exception {
        try (KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(this.noLockAsyncDouble2Function, 1000L, 10);){
            long initialTime = 0L;
            ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)1, 1L));
            testHarness.processElement(new StreamRecord((Object)2, 2L));
            testHarness.processElement(new StreamRecord((Object)3, 3L));
            testHarness.processElement(new StreamRecord((Object)4, 4L));
            testHarness.processElement(new StreamRecord((Object)5, 5L));
            testHarness.processElement(new StreamRecord((Object)6, 6L));
            testHarness.processElement(new StreamRecord((Object)7, 7L));
            testHarness.processElement(new StreamRecord((Object)8, 8L));
            expectedOutput.add(new StreamRecord((Object)2, 1L));
            expectedOutput.add(new StreamRecord((Object)4, 2L));
            expectedOutput.add(new StreamRecord((Object)6, 3L));
            expectedOutput.add(new StreamRecord((Object)8, 4L));
            expectedOutput.add(new StreamRecord((Object)10, 5L));
            expectedOutput.add(new StreamRecord((Object)12, 6L));
            expectedOutput.add(new StreamRecord((Object)14, 7L));
            expectedOutput.add(new StreamRecord((Object)16, 8L));
            testHarness.endInput();
            TestHarnessUtil.assertOutputEqualsSorted((String)"Output is not correct.", expectedOutput, (Iterable)testHarness.getOutput(), (Comparator)new StreamRecordComparator());
        }
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        AsyncWaitOperatorTest.LazyAsyncFunction testLazyAsyncFunction = new AsyncWaitOperatorTest.LazyAsyncFunction();
        KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(testLazyAsyncFunction, 1000L, 10);
        testHarness.open();
        long initialTime = 0L;
        testHarness.processElement(new StreamRecord((Object)1, 1L));
        testHarness.processElement(new StreamRecord((Object)2, 2L));
        testHarness.processElement(new StreamRecord((Object)3, 3L));
        testHarness.processElement(new StreamRecord((Object)4, 4L));
        Assertions.assertThat((Collection)testHarness.getOutput()).isEmpty();
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testLazyAsyncFunction.countDown();
        testHarness.endInput();
        ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<Object>();
        expected.add(new StreamRecord((Object)1, 1L));
        expected.add(new StreamRecord((Object)2, 2L));
        expected.add(new StreamRecord((Object)3, 3L));
        expected.add(new StreamRecord((Object)4, 4L));
        testHarness.getOutput().removeIf(record -> record instanceof CheckpointBarrier);
        TestHarnessUtil.assertOutputEqualsSorted((String)"StateAndRestored Test Output was not correct before restore.", expected, (Iterable)testHarness.getOutput(), (Comparator)new StreamRecordComparator());
        testHarness.close();
        testLazyAsyncFunction = new AsyncWaitOperatorTest.LazyAsyncFunction();
        testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(testLazyAsyncFunction, 1000L, 10);
        testHarness.initializeState(snapshot);
        testHarness.open();
        Optional<Epoch<Integer>> epochWithMinWatermark = this.unwrapEpoch(testHarness, Long.MIN_VALUE);
        Assertions.assertThat(epochWithMinWatermark).isPresent();
        Assertions.assertThat((int)epochWithMinWatermark.get().getOngoingRecordCount()).isEqualTo(4);
        testHarness.processWatermark(1000L);
        testHarness.processElement(new StreamRecord((Object)5, 5L));
        testHarness.processElement(new StreamRecord((Object)6, 6L));
        testHarness.processElement(new StreamRecord((Object)7, 7L));
        testHarness.processElement(new StreamRecord((Object)8, 8L));
        Optional<Epoch<Integer>> epochWithMidWatermark = this.unwrapEpoch(testHarness, 1000L);
        Assertions.assertThat(epochWithMidWatermark).isPresent();
        Assertions.assertThat((int)epochWithMidWatermark.get().getOngoingRecordCount()).isEqualTo(4);
        testHarness.processWatermark(Long.MAX_VALUE);
        Optional<Epoch<Integer>> epochWithMaxWatermark = this.unwrapEpoch(testHarness, Long.MAX_VALUE);
        Assertions.assertThat(epochWithMaxWatermark).isPresent();
        Assertions.assertThat((int)epochWithMaxWatermark.get().getOngoingRecordCount()).isEqualTo(0);
        expected.add(new Watermark(1000L));
        expected.add(new StreamRecord((Object)5, 5L));
        expected.add(new StreamRecord((Object)6, 6L));
        expected.add(new StreamRecord((Object)7, 7L));
        expected.add(new StreamRecord((Object)8, 8L));
        expected.add(new Watermark(Long.MAX_VALUE));
        testLazyAsyncFunction.countDown();
        testHarness.endInput();
        Assertions.assertThat((int)this.unwrapEpochManager(testHarness).getOutputQueue().size()).isEqualTo(1);
        Assertions.assertThat((long)((Epoch)this.unwrapEpochManager(testHarness).getOutputQueue().get(0)).getWatermark().getTimestamp()).isEqualTo(Long.MAX_VALUE);
        TestHarnessUtil.assertOutputEqualsSorted((String)"StateAndRestored Test Output was not correct.", expected, (Iterable)testHarness.getOutput(), (Comparator)new StreamRecordComparator());
        testHarness.close();
    }

    @Test
    public void testKeyedAsyncTimeoutFailure() throws Exception {
        this.testKeyedAsyncTimeout(new AsyncWaitOperatorTest.LazyAsyncFunction(), Optional.of(TimeoutException.class), List.of(new StreamRecord((Object)2, 5L)));
    }

    @Test
    public void testKeyedAsyncTimeoutIgnore() throws Exception {
        this.testKeyedAsyncTimeout((AsyncWaitOperatorTest.LazyAsyncFunction)new AsyncWaitOperatorTest.IgnoreTimeoutLazyAsyncFunction(), Optional.empty(), List.of(new StreamRecord((Object)3, 0L), new StreamRecord((Object)2, 5L)));
    }

    private void testKeyedAsyncTimeout(AsyncWaitOperatorTest.LazyAsyncFunction lazyAsyncFunction, Optional<Class<? extends Throwable>> expectedException, List<StreamRecord<Integer>> expectedRecords) throws Exception {
        long timeout = 10L;
        try (KeyedOneInputStreamOperatorTestHarness testHarness = TableKeyedAsyncWaitOperatorTest.createKeyedTestHarness(lazyAsyncFunction, 10L, 10);){
            testHarness.open();
            MockEnvironment mockEnvironment = testHarness.getEnvironment();
            mockEnvironment.setExpectedExternalFailureCause(Throwable.class);
            long initialTime = 0L;
            testHarness.setProcessingTime(0L);
            testHarness.processElement(new StreamRecord((Object)1, 0L));
            testHarness.setProcessingTime(5L);
            testHarness.processElement(new StreamRecord((Object)2, 5L));
            testHarness.setProcessingTime(11L);
            lazyAsyncFunction.countDown();
            testHarness.endInput();
            ConcurrentLinkedQueue<StreamRecord<Integer>> expectedOutput = new ConcurrentLinkedQueue<StreamRecord<Integer>>(expectedRecords);
            TestHarnessUtil.assertOutputEquals((String)"Output is not correct.", expectedOutput, (Queue)testHarness.getOutput());
            if (expectedException.isPresent()) {
                org.junit.jupiter.api.Assertions.assertTrue((boolean)mockEnvironment.getActualExternalFailureCause().isPresent());
                org.junit.jupiter.api.Assertions.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)((Throwable)mockEnvironment.getActualExternalFailureCause().get()), expectedException.get()).isPresent());
            }
        }
    }

    private static <OUT> KeyedOneInputStreamOperatorTestHarness<Integer, Integer, OUT> createKeyedTestHarness(AsyncFunction<Integer, OUT> function, long timeout, int capacity) throws Exception {
        return new KeyedOneInputStreamOperatorTestHarness((StreamOperatorFactory)new TableKeyedAsyncWaitOperatorFactory(function, keySelector, timeout, capacity), keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, (TypeSerializer)IntSerializer.INSTANCE);
    }

    private static <T> void assertWatermarkEquals(String message, Queue<T> expected, Queue<T> actual) {
        ArrayList<T> expectedList = new ArrayList<T>(expected);
        ArrayList<T> actualList = new ArrayList<T>(actual);
        Function<List, Map> extractWatermarks = list -> IntStream.range(0, list.size()).boxed().filter(i -> list.get((int)i) instanceof Watermark).collect(Collectors.toMap(i -> i, i -> (Watermark)list.get((int)i)));
        Map expectedWatermarks = extractWatermarks.apply(expectedList);
        Map actualWatermarks = extractWatermarks.apply(actualList);
        ((MapAssert)Assertions.assertThat((Map)actualWatermarks).as(message, new Object[0])).isEqualTo((Object)expectedWatermarks);
    }

    private EpochManager<Integer> unwrapEpochManager(KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness) {
        TableKeyedAsyncWaitOperator operator = (TableKeyedAsyncWaitOperator)testHarness.getOperator();
        TableAsyncExecutionController asyncExecutionController = operator.getAsyncExecutionController();
        return asyncExecutionController.getEpochManager();
    }

    private Optional<Epoch<Integer>> unwrapEpoch(KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Integer> testHarness, long targetEpochWatermark) {
        return this.unwrapEpochManager(testHarness).getProperEpoch(new Watermark(targetEpochWatermark));
    }

    private static class StreamRecordComparator
    implements Comparator<Object> {
        private StreamRecordComparator() {
        }

        @Override
        public int compare(Object o1, Object o2) {
            StreamRecord sr0 = (StreamRecord)o1;
            StreamRecord sr1 = (StreamRecord)o2;
            if (sr0.getTimestamp() != sr1.getTimestamp()) {
                return (int)(sr0.getTimestamp() - sr1.getTimestamp());
            }
            int comparison = ((Integer)sr0.getValue()).compareTo((Integer)sr1.getValue());
            if (comparison != 0) {
                return comparison;
            }
            return (Integer)sr0.getValue() - (Integer)sr1.getValue();
        }
    }
}

