package org.apache.flink.runtime.asyncprocessing;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.InternalStateFuture;
import org.apache.flink.core.state.StateFutureImpl;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.asyncprocessing.EpochManager;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.v2.internal.InternalPartitionedState;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.class */
public class AsyncExecutionController<K> implements StateRequestHandler, Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);
    private static final long DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL = 100;
    private final int batchSize;
    private final CallbackRunnerWrapper callbackRunner;
    private final long bufferTimeout;
    private final int maxInFlightRecordNum;
    private final MailboxExecutor mailboxExecutor;
    private final StateFutureImpl.AsyncFrameworkExceptionHandler exceptionHandler;
    final KeyAccountingUnit<K> keyAccountingUnit;
    private final StateFutureFactory<K> stateFutureFactory;
    private final StateExecutor stateExecutor;
    private final DeclarationManager declarationManager;
    RecordContext<K> currentContext;
    StateRequestBuffer<K> stateRequestsBuffer;
    private final int maxParallelism;
    final SwitchContextListener<K> switchContextListener;
    private final long bufferTimeoutCheckInterval = DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL;
    final EpochManager.ParallelMode epochParallelMode = EpochManager.ParallelMode.SERIAL_BETWEEN_EPOCH;
    private final Object notifyLock = new Object();
    private volatile boolean waitingMail = false;
    private int drainDepth = 0;
    final AtomicInteger inFlightRecordNum = new AtomicInteger(0);
    final EpochManager epochManager = new EpochManager(this);

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/AsyncExecutionController$SwitchContextListener.class */
    public interface SwitchContextListener<K> {
        void switchContext(@Nullable RecordContext<K> recordContext);
    }

    public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateFutureImpl.AsyncFrameworkExceptionHandler asyncFrameworkExceptionHandler, StateExecutor stateExecutor, DeclarationManager declarationManager, int i, int i2, long j, int i3, @Nullable SwitchContextListener<K> switchContextListener, @Nullable MetricGroup metricGroup) {
        this.keyAccountingUnit = new KeyAccountingUnit<>(i3);
        this.mailboxExecutor = mailboxExecutor;
        this.exceptionHandler = asyncFrameworkExceptionHandler;
        this.callbackRunner = new CallbackRunnerWrapper(mailboxExecutor, this::notifyNewMail);
        this.stateFutureFactory = new StateFutureFactory<>(this, this.callbackRunner, asyncFrameworkExceptionHandler);
        this.stateExecutor = stateExecutor;
        this.declarationManager = declarationManager;
        this.batchSize = i2;
        this.bufferTimeout = j;
        this.maxInFlightRecordNum = i3;
        this.maxParallelism = i;
        this.stateRequestsBuffer = new StateRequestBuffer<>(j, DEFAULT_BUFFER_TIMEOUT_CHECK_INTERVAL, l -> {
            mailboxExecutor.execute(() -> {
                if (this.stateRequestsBuffer.checkCurrentSeq(l.longValue())) {
                    triggerIfNeeded(true);
                }
            }, "AEC-buffer-timeout");
        });
        this.switchContextListener = switchContextListener;
        if (metricGroup != null) {
            metricGroup.gauge("numInFlightRecords", this::getInFlightRecordNum);
            metricGroup.gauge("activeBufferSize", () -> {
                return Integer.valueOf(this.stateRequestsBuffer.activeQueueSize());
            });
            metricGroup.gauge("blockingBufferSize", () -> {
                return Integer.valueOf(this.stateRequestsBuffer.blockingQueueSize());
            });
            metricGroup.gauge("numBlockingKeys", () -> {
                return Integer.valueOf(this.stateRequestsBuffer.blockingKeyNum());
            });
        }
        LOG.info("Create AsyncExecutionController: batchSize {}, bufferTimeout {}, maxInFlightRecordNum {}, epochParallelMode {}", new Object[]{Integer.valueOf(this.batchSize), Long.valueOf(this.bufferTimeout), Integer.valueOf(this.maxInFlightRecordNum), this.epochParallelMode});
    }

    public RecordContext<K> buildContext(Object obj, K k) {
        return buildContext(obj, k, false);
    }

    public RecordContext<K> buildContext(Object obj, K k, boolean z) {
        if (!z || this.currentContext == null) {
            return new RecordContext<>(obj == null ? RecordContext.EMPTY_RECORD : obj, k, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(k, this.maxParallelism), this.epochManager.onRecord(), this.declarationManager.variableCount());
        }
        return new RecordContext<>(obj == null ? RecordContext.EMPTY_RECORD : obj, k, this::disposeContext, KeyGroupRangeAssignment.assignToKeyGroup(k, this.maxParallelism), this.epochManager.onEpoch(this.currentContext.getEpoch()), this.currentContext.getVariablesReference(), this.currentContext.getPriority() + 1);
    }

    public void setCurrentContext(RecordContext<K> recordContext) {
        if (this.currentContext != recordContext) {
            this.currentContext = recordContext;
            this.declarationManager.setCurrentContext(recordContext);
            if (this.switchContextListener != null) {
                this.switchContextListener.switchContext(recordContext);
            }
        }
    }

    public RecordContext<K> getCurrentContext() {
        return this.currentContext;
    }

    void disposeContext(RecordContext<K> recordContext) {
        this.epochManager.completeOneRecord(recordContext.getEpoch());
        this.keyAccountingUnit.release(recordContext.getRecord(), recordContext.getKey());
        this.inFlightRecordNum.decrementAndGet();
        StateRequest<K, ?, ?, ?> unblockOneByKey = this.stateRequestsBuffer.unblockOneByKey(recordContext.getKey());
        if (unblockOneByKey != 0) {
            Preconditions.checkState(tryOccupyKey(unblockOneByKey.getRecordContext()));
            insertActiveBuffer(unblockOneByKey);
        }
    }

    boolean tryOccupyKey(RecordContext<K> recordContext) {
        boolean isKeyOccupied = recordContext.isKeyOccupied();
        if (!isKeyOccupied && this.keyAccountingUnit.occupy(recordContext.getRecord(), recordContext.getKey())) {
            recordContext.setKeyOccupied();
            isKeyOccupied = true;
        }
        return isKeyOccupied;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.StateRequestHandler
    public <IN, OUT> InternalStateFuture<OUT> handleRequest(@Nullable State state, StateRequestType stateRequestType, @Nullable IN in) {
        return handleRequest(state, stateRequestType, false, in, false);
    }

    public <IN, OUT> InternalStateFuture<OUT> handleRequest(@Nullable State state, StateRequestType stateRequestType, boolean z, @Nullable IN in, boolean z2) {
        InternalStateFuture<OUT> create = this.stateFutureFactory.create(this.currentContext);
        StateRequest<K, ?, IN, OUT> stateRequest = new StateRequest<>(state, stateRequestType, z || stateRequestType == StateRequestType.SYNC_POINT, in, create, this.currentContext);
        seizeCapacity(z2);
        if (tryOccupyKey(this.currentContext)) {
            insertActiveBuffer(stateRequest);
        } else {
            insertBlockingBuffer(stateRequest);
        }
        triggerIfNeeded(false);
        return create;
    }

    @Override // org.apache.flink.runtime.asyncprocessing.StateRequestHandler
    public <IN, OUT> OUT handleRequestSync(State state, StateRequestType stateRequestType, @Nullable IN in) {
        InternalStateFuture<OUT> handleRequest = handleRequest(state, stateRequestType, true, in, false);
        if (!handleRequest.isDone()) {
            triggerIfNeeded(true);
            while (!handleRequest.isDone()) {
                try {
                    if (!this.mailboxExecutor.tryYield()) {
                        if (!this.stateExecutor.fullyLoaded()) {
                            triggerIfNeeded(true);
                        }
                        waitForNewMails();
                    }
                } catch (InterruptedException e) {
                }
            }
        }
        return handleRequest.get();
    }

    @Override // org.apache.flink.runtime.asyncprocessing.StateRequestHandler
    public <N> void setCurrentNamespaceForState(@Nonnull InternalPartitionedState<N> internalPartitionedState, N n) {
        this.currentContext.setNamespace(internalPartitionedState, n);
    }

    <IN, OUT> void insertActiveBuffer(StateRequest<K, ?, IN, OUT> stateRequest) {
        if (!stateRequest.isSync()) {
            this.stateRequestsBuffer.enqueueToActive(stateRequest);
        } else if (stateRequest.getRequestType() == StateRequestType.SYNC_POINT) {
            stateRequest.getFuture().complete(null);
        } else {
            this.stateExecutor.executeRequestSync(stateRequest);
        }
    }

    <IN, OUT> void insertBlockingBuffer(StateRequest<K, ?, IN, OUT> stateRequest) {
        this.stateRequestsBuffer.enqueueToBlocking(stateRequest);
    }

    public boolean triggerIfNeeded(boolean z) {
        if (!z && this.stateRequestsBuffer.activeQueueSize() < this.batchSize) {
            return false;
        }
        Optional<StateRequestContainer> popActive = this.stateRequestsBuffer.popActive(this.batchSize, () -> {
            return this.stateExecutor.createStateRequestContainer();
        });
        if (!popActive.isPresent() || popActive.get().isEmpty()) {
            return false;
        }
        this.stateExecutor.executeBatchRequests(popActive.get());
        this.stateRequestsBuffer.advanceSeq();
        return true;
    }

    private void seizeCapacity(boolean z) {
        if (this.currentContext.isKeyOccupied()) {
            return;
        }
        drainInflightRecords(this.maxInFlightRecordNum, !z);
        this.inFlightRecordNum.incrementAndGet();
    }

    public StateFuture<Void> syncPointRequestWithCallback(ThrowingRunnable<Exception> throwingRunnable, boolean z) {
        return handleRequest(null, StateRequestType.SYNC_POINT, true, null, z).thenAccept(obj -> {
            throwingRunnable.run();
        });
    }

    public void drainInflightRecords(int i) {
        drainInflightRecords(i, true);
    }

    private void drainInflightRecords(int i, boolean z) {
        if (z || this.drainDepth <= 5) {
            RecordContext<K> recordContext = this.currentContext;
            this.drainDepth++;
            boolean z2 = true;
            while (z2) {
                try {
                    if (this.inFlightRecordNum.get() <= i) {
                        break;
                    }
                    if (!this.mailboxExecutor.tryYield()) {
                        boolean z3 = false;
                        if (i == 0 || !this.stateExecutor.fullyLoaded()) {
                            z3 = triggerIfNeeded(true);
                        }
                        if (z || z3 || this.stateExecutor.fullyLoaded() || this.callbackRunner.isHasMail()) {
                            waitForNewMails();
                        } else {
                            z2 = false;
                        }
                    }
                } catch (InterruptedException e) {
                    this.drainDepth--;
                    setCurrentContext(recordContext);
                    return;
                } catch (Throwable th) {
                    this.drainDepth--;
                    setCurrentContext(recordContext);
                    throw th;
                }
            }
            this.drainDepth--;
            setCurrentContext(recordContext);
        }
    }

    private void waitForNewMails() throws InterruptedException {
        if (this.callbackRunner.isHasMail()) {
            return;
        }
        synchronized (this.notifyLock) {
            if (!this.callbackRunner.isHasMail()) {
                this.waitingMail = true;
                this.notifyLock.wait(1L);
                this.waitingMail = false;
            }
        }
    }

    private void notifyNewMail() {
        if (this.waitingMail) {
            synchronized (this.notifyLock) {
                if (this.waitingMail) {
                    this.notifyLock.notify();
                }
            }
        }
    }

    public void processNonRecord(@Nullable ThrowingRunnable<? extends Exception> throwingRunnable, @Nullable ThrowingRunnable<? extends Exception> throwingRunnable2) {
        this.epochManager.onNonRecord(throwingRunnable == null ? null : () -> {
            try {
                RecordContext<K> recordContext = this.currentContext;
                setCurrentContext(null);
                throwingRunnable.run();
                setCurrentContext(recordContext);
            } catch (Exception e) {
                this.exceptionHandler.handleException("Failed to process non-record.", e);
            }
        }, throwingRunnable2 == null ? null : () -> {
            try {
                RecordContext<K> recordContext = this.currentContext;
                setCurrentContext(null);
                throwingRunnable2.run();
                setCurrentContext(recordContext);
            } catch (Exception e) {
                this.exceptionHandler.handleException("Failed to process non-record.", e);
            }
        }, this.epochParallelMode);
    }

    @VisibleForTesting
    public StateExecutor getStateExecutor() {
        return this.stateExecutor;
    }

    @VisibleForTesting
    public int getInFlightRecordNum() {
        return this.inFlightRecordNum.get();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.stateRequestsBuffer.close();
    }
}
