/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join.lookup.keyordered;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.DefaultOpenContext;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.async.CollectionSupplier;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.async.AsyncWaitOperatorTest;
import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.AecRecord;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.Epoch;
import org.apache.flink.table.runtime.operators.join.lookup.keyordered.TableAsyncExecutionController;
import org.apache.flink.table.runtime.util.AsyncKeyOrderedTestUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class TableAsyncExecutionControllerTest {
    private static final KeySelector<Integer, Integer> keySelector = (KeySelector & Serializable)input -> input;
    private final Queue<Integer> outputQueue = new LinkedList<Integer>();
    private final Queue<Watermark> outputWatermark = new LinkedList<Watermark>();
    private final Queue<StreamRecord<Integer>> outputProcessedRecords = new LinkedList<StreamRecord<Integer>>();
    private final Queue<Integer> outputProcessedInputIndexes = new LinkedList<Integer>();
    private TestAsyncExecutionController asyncExecutionController;
    private MailboxExecutor mailboxExecutor;
    private TestLazyAsyncFunction asyncFunction;

    @BeforeEach
    public void before() throws Exception {
        TaskMailboxImpl mailbox = new TaskMailboxImpl();
        MailboxProcessor mailboxProcessor = new MailboxProcessor(controller -> {}, (TaskMailbox)mailbox, StreamTaskActionExecutor.IMMEDIATE);
        this.mailboxExecutor = new MailboxExecutorImpl((TaskMailbox)mailbox, 0, StreamTaskActionExecutor.IMMEDIATE, mailboxProcessor);
        this.asyncFunction = new TestLazyAsyncFunction();
        this.asyncFunction.open(DefaultOpenContext.INSTANCE);
        this.asyncExecutionController = new TestAsyncExecutionController((ThrowingConsumer<AecRecord<Integer, Integer>, Exception>)((ThrowingConsumer)element -> this.asyncFunction.asyncInvoke((Integer)element.getRecord().getValue(), new Handler((AecRecord<Integer, Integer>)element, new TestStreamElementQueueEntry((StreamRecord<Integer>)element.getRecord(), element.getInputIndex(), this.outputProcessedRecords, this.outputProcessedInputIndexes, this.outputQueue), this.mailboxExecutor, this.asyncExecutionController))), (ThrowingConsumer<Watermark, Exception>)((ThrowingConsumer)this.outputWatermark::add));
    }

    @AfterEach
    public void after() throws Exception {
        this.asyncFunction.close();
        this.asyncExecutionController.close();
        this.outputQueue.clear();
        this.outputProcessedRecords.clear();
        this.outputProcessedInputIndexes.clear();
        this.outputWatermark.clear();
    }

    @Test
    public void testPendingRecords() throws Exception {
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 1L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)2, 2L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)2, 3L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)3, 4L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)3, 5L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)4, 6L), null, 0);
        Map actualPending = this.asyncExecutionController.pendingElements();
        Epoch epoch = new Epoch(new Watermark(Long.MIN_VALUE));
        IntStream.range(0, 6).forEach(i -> epoch.incrementCount());
        Assertions.assertThat((Collection)((Collection)actualPending.get(1))).containsExactlyInAnyOrder((Object[])new AecRecord[]{new AecRecord(new StreamRecord((Object)1, 1L), epoch, 0)});
        Assertions.assertThat((Collection)((Collection)actualPending.get(2))).containsExactly((Object[])new AecRecord[]{new AecRecord(new StreamRecord((Object)2, 2L), epoch, 0), new AecRecord(new StreamRecord((Object)2, 3L), epoch, 0)});
        Assertions.assertThat((Collection)((Collection)actualPending.get(3))).containsExactly((Object[])new AecRecord[]{new AecRecord(new StreamRecord((Object)3, 4L), epoch, 0), new AecRecord(new StreamRecord((Object)3, 5L), epoch, 0)});
        Assertions.assertThat((Collection)((Collection)actualPending.get(4))).containsExactlyInAnyOrder((Object[])new AecRecord[]{new AecRecord(new StreamRecord((Object)4, 6L), epoch, 0)});
    }

    @Test
    public void testDifferentKeyWithoutWatermark() throws Exception {
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 1L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)2, 2L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)3, 3L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)4, 4L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)5, 5L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)6, 6L), null, 0);
        Assertions.assertThat((long)this.asyncExecutionController.processedSize()).isEqualTo(6L);
        this.waitComplete();
        LinkedList<Integer> expectedOutput = new LinkedList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6));
        Assertions.assertThat(this.outputQueue.stream().sorted()).isEqualTo(expectedOutput);
        Epoch expectedEpoch = new Epoch(new Watermark(Long.MIN_VALUE));
        Assertions.assertThat((Object)this.asyncExecutionController.getActiveEpoch()).isEqualTo((Object)expectedEpoch);
    }

    @Test
    public void testDifferentKeyWithWatermark() throws Exception {
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 1L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)2, 2L), null, 0);
        this.asyncExecutionController.submitWatermark(new Watermark(3L));
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)3, 4L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)4, 5L), null, 0);
        this.asyncExecutionController.submitWatermark(new Watermark(6L));
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)5, 7L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)6, 8L), null, 0);
        Assertions.assertThat((long)this.asyncExecutionController.processedSize()).isEqualTo(6L);
        this.waitComplete();
        LinkedList<Integer> expectedOutput = new LinkedList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6));
        Assertions.assertThat(this.outputQueue.stream().sorted()).isEqualTo(expectedOutput);
        LinkedList<Watermark> expectedWatermark = new LinkedList<Watermark>(Arrays.asList(new Watermark(3L), new Watermark(6L)));
        Assertions.assertThat(this.outputWatermark).isEqualTo(expectedWatermark);
        Epoch expectedEpoch = new Epoch(new Watermark(6L));
        Assertions.assertThat((Object)this.asyncExecutionController.getActiveEpoch()).isEqualTo((Object)expectedEpoch);
    }

    @Test
    public void testSameKeyWithWatermark() throws Exception {
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 1L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 2L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 3L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 4L), null, 0);
        this.asyncExecutionController.submitWatermark(new Watermark(5L));
        this.asyncExecutionController.submitWatermark(new Watermark(6L));
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 7L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 8L), null, 0);
        this.asyncExecutionController.submitWatermark(new Watermark(9L));
        Assertions.assertThat((long)this.asyncExecutionController.processedSize()).isEqualTo(6L);
        this.waitComplete();
        LinkedList<Integer> expectedOutput = new LinkedList<Integer>(Arrays.asList(1, 1, 1, 1, 1, 1));
        Assertions.assertThat(this.outputQueue).isEqualTo(expectedOutput);
        LinkedList<StreamRecord> expectedProcessed = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 4L), new StreamRecord((Object)1, 7L), new StreamRecord((Object)1, 8L)));
        Assertions.assertThat(this.outputProcessedRecords).isEqualTo(expectedProcessed);
        LinkedList<Watermark> expectedWatermark = new LinkedList<Watermark>(Arrays.asList(new Watermark(5L), new Watermark(6L), new Watermark(9L)));
        Assertions.assertThat(this.outputWatermark).isEqualTo(expectedWatermark);
        Epoch expectedEpoch = new Epoch(new Watermark(9L));
        Assertions.assertThat((Object)this.asyncExecutionController.getActiveEpoch()).isEqualTo((Object)expectedEpoch);
    }

    @Test
    public void testMixKeyWithWatermark() throws Exception {
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 1L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)3, 2L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)4, 3L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)3, 4L), null, 0);
        this.asyncExecutionController.submitWatermark(new Watermark(5L));
        this.asyncExecutionController.submitWatermark(new Watermark(6L));
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)4, 7L), null, 0);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 8L), null, 0);
        this.asyncExecutionController.submitWatermark(new Watermark(9L));
        Assertions.assertThat((long)this.asyncExecutionController.processedSize()).isEqualTo(6L);
        this.waitComplete();
        LinkedList<Integer> expectedOutput = new LinkedList<Integer>(Arrays.asList(1, 1, 3, 3, 4, 4));
        Assertions.assertThat(this.outputQueue.stream().sorted()).isEqualTo(expectedOutput);
        LinkedList<StreamRecord> expectedProcessed = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 8L)));
        AsyncKeyOrderedTestUtils.assertKeyOrdered(this.outputProcessedRecords, expectedProcessed);
        expectedProcessed = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)3, 2L), new StreamRecord((Object)3, 4L)));
        AsyncKeyOrderedTestUtils.assertKeyOrdered(this.outputProcessedRecords, expectedProcessed);
        expectedProcessed = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)4, 3L), new StreamRecord((Object)4, 7L)));
        AsyncKeyOrderedTestUtils.assertKeyOrdered(this.outputProcessedRecords, expectedProcessed);
        LinkedList<Watermark> expectedWatermark = new LinkedList<Watermark>(Arrays.asList(new Watermark(5L), new Watermark(6L), new Watermark(9L)));
        Assertions.assertThat(this.outputWatermark).isEqualTo(expectedWatermark);
        Epoch expectedEpoch = new Epoch(new Watermark(9L));
        Assertions.assertThat((Object)this.asyncExecutionController.getActiveEpoch()).isEqualTo((Object)expectedEpoch);
    }

    @Test
    public void testProcessWithMultiInputs() throws Exception {
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 1L), null, 1);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 2L), null, 2);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 3L), null, 1);
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 4L), null, 3);
        this.asyncExecutionController.submitWatermark(new Watermark(5L));
        this.asyncExecutionController.submitRecord((StreamRecord<Integer>)new StreamRecord((Object)1, 6L), null, 4);
        Assertions.assertThat((long)this.asyncExecutionController.processedSize()).isEqualTo(5L);
        this.waitComplete();
        LinkedList<Integer> expectedOutput = new LinkedList<Integer>(Arrays.asList(1, 1, 1, 1, 1));
        Assertions.assertThat(this.outputQueue).isEqualTo(expectedOutput);
        LinkedList<StreamRecord> expectedProcessed = new LinkedList<StreamRecord>(Arrays.asList(new StreamRecord((Object)1, 1L), new StreamRecord((Object)1, 2L), new StreamRecord((Object)1, 3L), new StreamRecord((Object)1, 4L), new StreamRecord((Object)1, 6L)));
        Assertions.assertThat(this.outputProcessedRecords).isEqualTo(expectedProcessed);
        LinkedList<Watermark> expectedWatermark = new LinkedList<Watermark>(Collections.singletonList(new Watermark(5L)));
        Assertions.assertThat(this.outputWatermark).isEqualTo(expectedWatermark);
        Epoch expectedEpoch = new Epoch(new Watermark(5L));
        Assertions.assertThat((Object)this.asyncExecutionController.getActiveEpoch()).isEqualTo((Object)expectedEpoch);
        LinkedList<Integer> expectedProcessedInputIndexes = new LinkedList<Integer>(Arrays.asList(1, 2, 1, 3, 4));
        Assertions.assertThat(this.outputProcessedInputIndexes).isEqualTo(expectedProcessedInputIndexes);
    }

    private void waitComplete() {
        long now = System.currentTimeMillis();
        while (this.mailboxExecutor.tryYield()) {
            if (System.currentTimeMillis() - now <= 3000L) continue;
            org.junit.jupiter.api.Assertions.fail((String)"Execution timeout");
        }
    }

    private static class TestLazyAsyncFunction
    extends AsyncWaitOperatorTest.LazyAsyncFunction {
        public void asyncInvoke(Integer input, ResultFuture<Integer> resultFuture) {
            resultFuture.complete(Collections.singletonList(input));
        }
    }

    private static class TestAsyncExecutionController
    extends TableAsyncExecutionController<Integer, Integer, Integer> {
        private final AtomicLong processedAccount = new AtomicLong(0L);

        public TestAsyncExecutionController(ThrowingConsumer<AecRecord<Integer, Integer>, Exception> asyncInvoke, ThrowingConsumer<Watermark, Exception> emitWatermark) {
            super(asyncInvoke, emitWatermark, entry -> entry.emitResult(null), entry -> ((TestStreamElementQueueEntry)entry).getInputIndex(), (record, inputIndex) -> (Integer)keySelector.getKey((Object)((Integer)record.getValue())));
        }

        public void submitRecord(StreamRecord<Integer> record, @Nullable Epoch<Integer> epoch, int inputIndex) throws Exception {
            this.processedAccount.incrementAndGet();
            super.submitRecord(record, epoch, inputIndex);
        }

        public long processedSize() {
            return this.processedAccount.get();
        }
    }

    private static class TestStreamElementQueueEntry
    implements StreamElementQueueEntry<Integer> {
        @Nonnull
        private final StreamRecord<Integer> inputRecord;
        private Collection<Integer> results;
        private final int inputIndex;
        private final Queue<StreamRecord<Integer>> outputProcessedRecords;
        private final Queue<Integer> outputProcessedInputIndexes;
        private final Queue<Integer> outputQueue;

        public TestStreamElementQueueEntry(@Nonnull StreamRecord<Integer> inputRecord, int inputIndex, Queue<StreamRecord<Integer>> outputProcessedRecords, Queue<Integer> outputProcessedInputIndexes, Queue<Integer> outputQueue) {
            this.inputRecord = inputRecord;
            this.inputIndex = inputIndex;
            this.outputProcessedRecords = outputProcessedRecords;
            this.outputProcessedInputIndexes = outputProcessedInputIndexes;
            this.outputQueue = outputQueue;
        }

        public void complete(Collection<Integer> result) {
            this.results = result;
        }

        public boolean isDone() {
            return false;
        }

        public void emitResult(TimestampedCollector<Integer> output) {
            this.outputQueue.addAll(this.results);
            this.outputProcessedRecords.add(this.inputRecord);
            this.outputProcessedInputIndexes.add(this.inputIndex);
        }

        @Nonnull
        public StreamElement getInputElement() {
            return this.inputRecord;
        }

        public int getInputIndex() {
            return this.inputIndex;
        }
    }

    private static class Handler
    implements ResultFuture<Integer> {
        private final AecRecord<Integer, Integer> inputRecord;
        private final StreamElementQueueEntry<Integer> resultFuture;
        private final MailboxExecutor mailboxExecutor;
        private final TestAsyncExecutionController asyncExecutionController;

        public Handler(AecRecord<Integer, Integer> inputRecord, StreamElementQueueEntry<Integer> resultFuture, MailboxExecutor mailboxExecutor, TestAsyncExecutionController asyncExecutionController) {
            this.inputRecord = inputRecord;
            this.resultFuture = resultFuture;
            this.mailboxExecutor = mailboxExecutor;
            this.asyncExecutionController = asyncExecutionController;
        }

        public void complete(Collection<Integer> results) {
            this.mailboxExecutor.execute(() -> {
                try {
                    this.resultFuture.complete(results);
                    this.asyncExecutionController.completeRecord(this.resultFuture, this.inputRecord);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, "handler complete results");
        }

        public void completeExceptionally(Throwable error) {
        }

        public void complete(CollectionSupplier<Integer> supplier) {
        }
    }
}

