/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.asyncprocessing.operators;

import java.util.concurrent.ExecutorService;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.asyncprocessing.AsyncFuture;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.SimpleAsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncKeyOrderedStreamOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncKeyOrderedProcessingOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.CheckedSupplier;

@Internal
public abstract class AbstractAsyncRunnableStreamOperator<OUT>
extends AbstractAsyncKeyOrderedStreamOperator<OUT>
implements AsyncKeyOrderedProcessingOperator {
    final KeySelector<?, ?> keySelector1;
    final KeySelector<?, ?> keySelector2;
    final ExecutorService asyncThreadPool;
    final int asyncBufferSize;
    final long asyncBufferTimeout;
    final int inFlightRecordsLimit;

    public AbstractAsyncRunnableStreamOperator(KeySelector<?, ?> keySelector1, KeySelector<?, ?> keySelector2, ExecutorService asyncThreadPool, int asyncBufferSize, long asyncBufferTimeout, int inFlightRecordsLimit) {
        this.keySelector1 = keySelector1;
        this.keySelector2 = keySelector2;
        this.asyncThreadPool = asyncThreadPool;
        this.asyncBufferSize = asyncBufferSize;
        this.asyncBufferTimeout = asyncBufferTimeout;
        this.inFlightRecordsLimit = inFlightRecordsLimit;
    }

    @Override
    protected KeySelector getKeySelectorForAsyncKeyedContext(int index) {
        switch (index) {
            case 1: {
                return this.keySelector1;
            }
            case 2: {
                return this.keySelector2;
            }
        }
        throw new ArrayIndexOutOfBoundsException("Try to get key selector for index " + index);
    }

    @Override
    protected AsyncExecutionController createAsyncExecutionController() {
        if (this.isAsyncKeyOrderedProcessingEnabled()) {
            StreamTask containingTask = (StreamTask)Preconditions.checkNotNull(this.getContainingTask());
            MailboxExecutor mailboxExecutor = containingTask.getMailboxExecutorFactory().createExecutor(this.getOperatorConfig().getChainIndex());
            int maxParallelism = this.environment.getTaskInfo().getMaxNumberOfParallelSubtasks();
            return new SimpleAsyncExecutionController(mailboxExecutor, this::handleAsyncException, this.asyncThreadPool, this.getDeclarationManager(), this.getEpochParallelMode(), maxParallelism, this.asyncBufferSize, this.asyncBufferTimeout, this.inFlightRecordsLimit, null, this.getMetricGroup().addGroup("asyncProcessing"));
        }
        return null;
    }

    protected EpochManager.ParallelMode getEpochParallelMode() {
        return EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH;
    }

    protected <RET> AsyncFuture<RET> asyncProcess(CheckedSupplier<RET> runnable) {
        return ((SimpleAsyncExecutionController)this.asyncExecutionController).handleRequest(runnable, false);
    }
}

