package org.apache.flink.runtime.asyncprocessing.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.AsyncStateException;
import org.apache.flink.runtime.asyncprocessing.RecordContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationManager;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceWithAsyncState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.class */
public abstract class AbstractAsyncStateStreamOperatorV2<OUT> extends AbstractStreamOperatorV2<OUT> implements AsyncStateProcessingOperator {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncStateStreamOperatorV2.class);
    private final Environment environment;
    private final StreamTask<?, ?> streamTask;
    private AsyncExecutionController asyncExecutionController;
    private RecordContext currentProcessingContext;
    protected DeclarationManager declarationManager;

    public AbstractAsyncStateStreamOperatorV2(StreamOperatorParameters<OUT> streamOperatorParameters, int i) {
        super(streamOperatorParameters, i);
        this.environment = streamOperatorParameters.getContainingTask().getEnvironment();
        this.streamTask = streamOperatorParameters.getContainingTask();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public final void beforeInitializeStateHandler() {
        KeyedStateStore orElse = this.stateHandler.getKeyedStateStore().orElse(null);
        if (orElse instanceof DefaultKeyedStateStore) {
            ((DefaultKeyedStateStore) orElse).setSupportKeyedStateApiSetV2();
        }
        int asyncInflightRecordsLimit = getExecutionConfig().getAsyncInflightRecordsLimit();
        int asyncStateBufferSize = getExecutionConfig().getAsyncStateBufferSize();
        long asyncStateBufferTimeout = getExecutionConfig().getAsyncStateBufferTimeout();
        int maxParallelism = getExecutionConfig().getMaxParallelism();
        this.declarationManager = new DeclarationManager();
        if (isAsyncStateProcessingEnabled()) {
            AsyncKeyedStateBackend asyncKeyedStateBackend = this.stateHandler.getAsyncKeyedStateBackend();
            if (asyncKeyedStateBackend == null) {
                if (this.stateHandler.getKeyedStateBackend() != null) {
                    throw new UnsupportedOperationException("Current State Backend doesn't support async access, AsyncExecutionController could not work");
                }
                return;
            }
            this.asyncExecutionController = new AsyncExecutionController(this.environment.getMainMailboxExecutor(), this::handleAsyncStateException, asyncKeyedStateBackend.createStateExecutor(), this.declarationManager, maxParallelism, asyncStateBufferSize, asyncStateBufferTimeout, asyncInflightRecordsLimit, asyncKeyedStateBackend, getMetricGroup().addGroup("asyncStateProcessing"));
            asyncKeyedStateBackend.setup(this.asyncExecutionController);
            if (asyncKeyedStateBackend instanceof AsyncKeyedStateBackendAdaptor) {
                LOG.warn("A normal KeyedStateBackend({}) is used when enabling the async state processing. Parallel asynchronous processing does not work. All state access will be processed synchronously.", this.stateHandler.getKeyedStateBackend());
            }
        }
    }

    private void handleAsyncStateException(String str, Throwable th) {
        this.environment.failExternally(new AsyncStateException(str, th));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2, org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing
    public boolean isAsyncStateProcessingEnabled() {
        return true;
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public ElementOrder getElementOrder() {
        return ElementOrder.RECORD_ORDER;
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final <T> void setAsyncKeyedContextElement(StreamRecord<T> streamRecord, KeySelector<T, ?> keySelector) throws Exception {
        this.currentProcessingContext = this.asyncExecutionController.buildContext(streamRecord.getValue(), keySelector.getKey(streamRecord.getValue()));
        this.currentProcessingContext.retain();
        this.asyncExecutionController.setCurrentContext(this.currentProcessingContext);
        newKeySelected(this.currentProcessingContext.getKey());
    }

    public void newKeySelected(Object obj) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public <T> void internalSetKeyContextElement(StreamRecord<T> streamRecord, KeySelector<T, ?> keySelector) throws Exception {
        super.internalSetKeyContextElement(streamRecord, keySelector);
        if (keySelector != null) {
            newKeySelected(getCurrentKey());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2, org.apache.flink.streaming.api.operators.KeyContext
    public Object getCurrentKey() {
        if (!isAsyncStateProcessingEnabled()) {
            return super.getCurrentKey();
        }
        RecordContext currentContext = this.asyncExecutionController.getCurrentContext();
        if (currentContext == null) {
            throw new UnsupportedOperationException("Have not set the current key yet, this may because the operator has not started to run, or you are invoking this under a non-keyed context.");
        }
        return currentContext.getKey();
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final void postProcessElement() {
        this.currentProcessingContext.release();
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final void preserveRecordOrderAndProcess(ThrowingRunnable<Exception> throwingRunnable) {
        this.asyncExecutionController.syncPointRequestWithCallback(throwingRunnable, false);
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public <K> void asyncProcessWithKey(K k, ThrowingRunnable<Exception> throwingRunnable) {
        RecordContext<K> currentContext = this.asyncExecutionController.getCurrentContext();
        RecordContext<K> buildContext = this.asyncExecutionController.buildContext(null, k, true);
        buildContext.retain();
        this.asyncExecutionController.setCurrentContext(buildContext);
        this.asyncExecutionController.syncPointRequestWithCallback(throwingRunnable, true);
        buildContext.release();
        this.asyncExecutionController.setCurrentContext(currentContext);
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator
    public final DeclarationManager getDeclarationManager() {
        return this.declarationManager;
    }

    @Override // org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing
    public final <T> ThrowingConsumer<StreamRecord<T>, Exception> getRecordProcessor(int i) {
        throw new UnsupportedOperationException("Never getRecordProcessor from AbstractAsyncStateStreamOperatorV2, since this part is handled by the Input.");
    }

    protected <N, S extends State, T> S getOrCreateKeyedState(@Nonnull N n, @Nonnull TypeSerializer<N> typeSerializer, @Nonnull StateDescriptor<T> stateDescriptor) throws Exception {
        return (S) this.stateHandler.getOrCreateKeyedState(n, typeSerializer, stateDescriptor);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2, org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.drainInflightRecords(0);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public <K, N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<N> typeSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        if (!isAsyncStateProcessingEnabled()) {
            return super.getInternalTimerService(str, typeSerializer, triggerable);
        }
        InternalTimeServiceManager<?> internalTimeServiceManager = this.timeServiceManager;
        TypeSerializer<?> keySerializer = this.stateHandler.getKeySerializer();
        Preconditions.checkState(keySerializer != null, "Timers can only be used on keyed operators.");
        InternalTimerService<N> internalTimerService = internalTimeServiceManager.getInternalTimerService(str, keySerializer, typeSerializer, triggerable);
        if (internalTimerService instanceof InternalTimerServiceAsyncImpl) {
            ((InternalTimerServiceAsyncImpl) internalTimerService).setup(this.asyncExecutionController);
        } else if (internalTimerService instanceof BatchExecutionInternalTimeServiceWithAsyncState) {
            ((BatchExecutionInternalTimeServiceWithAsyncState) internalTimerService).setup(this.asyncExecutionController);
        }
        return internalTimerService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public void reportOrForwardLatencyMarker(LatencyMarker latencyMarker) {
        if (isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.processNonRecord(null, () -> {
                super.reportOrForwardLatencyMarker(latencyMarker);
            });
        } else {
            super.reportOrForwardLatencyMarker(latencyMarker);
        }
    }

    public Watermark preProcessWatermark(Watermark watermark) throws Exception {
        return watermark;
    }

    public void postProcessWatermark(Watermark watermark) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public final void processWatermark(Watermark watermark) throws Exception {
        if (isAsyncStateProcessingEnabled()) {
            AtomicReference atomicReference = new AtomicReference(null);
            this.asyncExecutionController.processNonRecord(() -> {
                atomicReference.set(preProcessWatermark(watermark));
                if (this.timeServiceManager == null || atomicReference.get() == null) {
                    return;
                }
                this.timeServiceManager.advanceWatermark((Watermark) atomicReference.get());
            }, () -> {
                if (atomicReference.get() != null) {
                    this.output.emitWatermark((Watermark) atomicReference.get());
                    postProcessWatermark((Watermark) atomicReference.get());
                }
            });
            return;
        }
        Watermark preProcessWatermark = preProcessWatermark(watermark);
        if (preProcessWatermark != null) {
            super.processWatermark(preProcessWatermark);
            postProcessWatermark(preProcessWatermark);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public void processWatermarkStatus(WatermarkStatus watermarkStatus, int i) throws Exception {
        if (!isAsyncStateProcessingEnabled()) {
            super.processWatermarkStatus(watermarkStatus, i);
            return;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference atomicReference = new AtomicReference(null);
        this.asyncExecutionController.processNonRecord(() -> {
            atomicBoolean.set(this.combinedWatermark.isIdle());
            if (this.combinedWatermark.updateStatus(i - 1, watermarkStatus.isIdle())) {
                atomicReference.set(preProcessWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark())));
                if (this.timeServiceManager == null || atomicReference.get() == null) {
                    return;
                }
                this.timeServiceManager.advanceWatermark((Watermark) atomicReference.get());
            }
        }, () -> {
            if (atomicReference.get() != null) {
                this.output.emitWatermark((Watermark) atomicReference.get());
            }
            if (atomicBoolean.get() != this.combinedWatermark.isIdle()) {
                this.output.emitWatermarkStatus(watermarkStatus);
            }
        });
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2
    public void processRecordAttributes(RecordAttributes recordAttributes, int i) throws Exception {
        if (isAsyncStateProcessingEnabled()) {
            this.asyncExecutionController.processNonRecord(null, () -> {
                super.processRecordAttributes(recordAttributes, i);
            });
        } else {
            super.processRecordAttributes(recordAttributes, i);
        }
    }

    @VisibleForTesting
    public AsyncExecutionController<?> getAsyncExecutionController() {
        return this.asyncExecutionController;
    }

    @VisibleForTesting
    public RecordContext getCurrentProcessingContext() {
        return this.currentProcessingContext;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2, org.apache.flink.streaming.api.operators.StreamOperator
    public void finish() throws Exception {
        super.finish();
        closeIfNeeded();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        closeIfNeeded();
    }

    private void closeIfNeeded() {
        if (!isAsyncStateProcessingEnabled() || this.streamTask.isFailing() || this.streamTask.isCanceled()) {
            return;
        }
        this.asyncExecutionController.drainInflightRecords(0);
    }
}
