package org.apache.flink.streaming.api.runners.python.beam;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.BeamUrns;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.Timer;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.ImmutableExecutableStage;
import org.apache.beam.runners.core.construction.graph.PipelineNode;
import org.apache.beam.runners.core.construction.graph.TimerReference;
import org.apache.beam.runners.fnexecution.control.BundleProgressHandler;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.runners.fnexecution.control.RemoteBundle;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.runners.fnexecution.control.TimerReceiverFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.Constants;
import org.apache.flink.python.PythonFunctionRunner;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.env.PythonEnvironment;
import org.apache.flink.python.env.process.ProcessPythonEnvironment;
import org.apache.flink.python.env.process.ProcessPythonEnvironmentManager;
import org.apache.flink.python.metric.process.FlinkMetricContainer;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistration;
import org.apache.flink.streaming.api.operators.python.process.timer.TimerRegistrationAction;
import org.apache.flink.streaming.api.runners.python.beam.state.BeamStateRequestHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/BeamPythonFunctionRunner.class */
public abstract class BeamPythonFunctionRunner implements PythonFunctionRunner {
    protected static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamPythonFunctionRunner.class);
    private static final String INPUT_CODER_ID = "input_coder";
    private static final String OUTPUT_CODER_ID = "output_coder";
    private static final String MANAGED_MEMORY_RESOURCE_ID = "python-process-managed-memory";
    private static final String PYTHON_WORKER_MEMORY_LIMIT = "_PYTHON_WORKER_MEMORY_LIMIT";
    private final String taskName;
    private final ProcessPythonEnvironmentManager environmentManager;

    @Nullable
    private final FlinkMetricContainer flinkMetricContainer;

    @Nullable
    private final KeyedStateBackend<?> keyedStateBackend;

    @Nullable
    private final OperatorStateBackend operatorStateBackend;

    @Nullable
    private final TypeSerializer<?> keySerializer;

    @Nullable
    private final TypeSerializer<?> namespaceSerializer;

    @Nullable
    private final TimerRegistration timerRegistration;
    private final MemoryManager memoryManager;
    private final double managedMemoryFraction;
    protected final FlinkFnApi.CoderInfoDescriptor inputCoderDescriptor;
    protected final FlinkFnApi.CoderInfoDescriptor outputCoderDescriptor;
    protected final Map<String, FlinkFnApi.CoderInfoDescriptor> sideOutputCoderDescriptors;
    private transient boolean bundleStarted;
    private transient JobBundleFactory jobBundleFactory;
    private transient StageBundleFactory stageBundleFactory;
    private transient StateRequestHandler stateRequestHandler;
    private transient BundleProgressHandler progressHandler;
    private transient RemoteBundle remoteBundle;
    private transient Tuple3<String, byte[], Integer> reusableResultTuple;

    @VisibleForTesting
    protected transient LinkedBlockingQueue<Tuple2<String, byte[]>> resultBuffer;

    @VisibleForTesting
    protected transient FnDataReceiver<WindowedValue<byte[]>> mainInputReceiver;
    private transient FnDataReceiver<Timer> timerInputReceiver;
    private transient OpaqueMemoryResource<PythonSharedResources> sharedResources;
    private transient Thread shutdownHook;
    private transient Environment environment;
    private volatile transient List<TimerRegistrationAction> unregisteredTimers;

    public BeamPythonFunctionRunner(Environment environment, String str, ProcessPythonEnvironmentManager processPythonEnvironmentManager, @Nullable FlinkMetricContainer flinkMetricContainer, @Nullable KeyedStateBackend<?> keyedStateBackend, @Nullable OperatorStateBackend operatorStateBackend, @Nullable TypeSerializer<?> typeSerializer, @Nullable TypeSerializer<?> typeSerializer2, @Nullable TimerRegistration timerRegistration, MemoryManager memoryManager, double d, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor, FlinkFnApi.CoderInfoDescriptor coderInfoDescriptor2, Map<String, FlinkFnApi.CoderInfoDescriptor> map) {
        this.environment = environment;
        this.taskName = (String) Preconditions.checkNotNull(str);
        this.environmentManager = (ProcessPythonEnvironmentManager) Preconditions.checkNotNull(processPythonEnvironmentManager);
        this.flinkMetricContainer = flinkMetricContainer;
        this.keyedStateBackend = keyedStateBackend;
        this.operatorStateBackend = operatorStateBackend;
        this.keySerializer = typeSerializer;
        this.namespaceSerializer = typeSerializer2;
        this.timerRegistration = timerRegistration;
        this.memoryManager = memoryManager;
        this.managedMemoryFraction = d;
        this.inputCoderDescriptor = (FlinkFnApi.CoderInfoDescriptor) Preconditions.checkNotNull(coderInfoDescriptor);
        this.outputCoderDescriptor = (FlinkFnApi.CoderInfoDescriptor) Preconditions.checkNotNull(coderInfoDescriptor2);
        this.sideOutputCoderDescriptors = (Map) Preconditions.checkNotNull(map);
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void open(ReadableConfig readableConfig) throws Exception {
        this.bundleStarted = false;
        this.resultBuffer = new LinkedBlockingQueue<>();
        this.reusableResultTuple = new Tuple3<>();
        this.stateRequestHandler = getStateRequestHandler(this.keyedStateBackend, this.operatorStateBackend, this.keySerializer, this.namespaceSerializer, readableConfig);
        this.environmentManager.open();
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(getClass().getClassLoader());
        try {
            PortablePipelineOptions portablePipelineOptions = (PortablePipelineOptions) PipelineOptionsFactory.as(PortablePipelineOptions.class);
            if (of != null) {
                of.close();
            }
            int intValue = ((Integer) readableConfig.get(PythonOptions.STATE_CACHE_SIZE)).intValue();
            if (intValue > 0) {
                ((ExperimentalOptions) portablePipelineOptions.as(ExperimentalOptions.class)).setExperiments(Collections.singletonList("state_cache_size=" + intValue));
            }
            Struct proto = PipelineOptionsTranslation.toProto(portablePipelineOptions);
            if (this.memoryManager == null || !((Boolean) readableConfig.get(PythonOptions.USE_MANAGED_MEMORY)).booleanValue() || this.managedMemoryFraction <= 0.0d || this.managedMemoryFraction > 1.0d) {
                if (this.memoryManager != null && ((Boolean) readableConfig.get(PythonOptions.USE_MANAGED_MEMORY)).booleanValue() && (this.managedMemoryFraction <= 0.0d || this.managedMemoryFraction > 1.0d)) {
                    LOG.warn(String.format("The configured managed memory fraction for Python worker process must be within (0, 1], was: %s, use off-heap memory instead.Please see config option \"taskmanager.memory.managed.consumer-weights\" for more details.", Double.valueOf(this.managedMemoryFraction)));
                }
                this.jobBundleFactory = createJobBundleFactory(proto);
                this.stageBundleFactory = createStageBundleFactory(this.jobBundleFactory, createPythonExecutionEnvironment(readableConfig, -1L));
            } else {
                this.sharedResources = this.memoryManager.getSharedMemoryResourceForManagedMemory(MANAGED_MEMORY_RESOURCE_ID, j -> {
                    return new PythonSharedResources(createJobBundleFactory(proto), createPythonExecutionEnvironment(readableConfig, j));
                }, this.managedMemoryFraction);
                LOG.info("Obtained shared Python process of size {} bytes", Long.valueOf(this.sharedResources.getSize()));
                ((PythonSharedResources) this.sharedResources.getResourceHandle()).addPythonEnvironmentManager(this.environmentManager);
                this.stageBundleFactory = createStageBundleFactory(((PythonSharedResources) this.sharedResources.getResourceHandle()).getJobBundleFactory(), ((PythonSharedResources) this.sharedResources.getResourceHandle()).getEnvironment());
            }
            this.progressHandler = getProgressHandler(this.flinkMetricContainer);
            this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, BeamPythonFunctionRunner.class.getSimpleName(), LOG);
            this.unregisteredTimers = Collections.synchronizedList(new LinkedList());
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner, java.lang.AutoCloseable
    public void close() throws Exception {
        try {
            if (this.jobBundleFactory != null) {
                this.jobBundleFactory.close();
            }
            try {
                if (this.sharedResources != null) {
                    this.sharedResources.close();
                } else {
                    this.environmentManager.close();
                }
                if (this.shutdownHook != null) {
                    ShutdownHookUtil.removeShutdownHook(this.shutdownHook, BeamPythonFunctionRunner.class.getSimpleName(), LOG);
                    this.shutdownHook = null;
                }
            } finally {
                this.sharedResources = null;
            }
        } finally {
            this.jobBundleFactory = null;
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void process(byte[] bArr) throws Exception {
        checkInvokeStartBundle();
        this.mainInputReceiver.accept(WindowedValue.valueInGlobalWindow(bArr));
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void drainUnregisteredTimers() {
        synchronized (this.unregisteredTimers) {
            Iterator<TimerRegistrationAction> it = this.unregisteredTimers.iterator();
            while (it.hasNext()) {
                it.next().registerTimer();
            }
            this.unregisteredTimers.clear();
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void processTimer(byte[] bArr) throws Exception {
        if (this.timerInputReceiver == null) {
            checkInvokeStartBundle();
            this.timerInputReceiver = (FnDataReceiver) Preconditions.checkNotNull((FnDataReceiver) Iterables.getOnlyElement(this.remoteBundle.getTimerReceivers().values()), "Failed to retrieve main input receiver.");
        }
        this.timerInputReceiver.accept(Timer.cleared(bArr, "", Collections.emptyList()));
    }

    private void checkInvokeStartBundle() {
        if (this.bundleStarted) {
            return;
        }
        startBundle();
        this.bundleStarted = true;
    }

    @VisibleForTesting
    protected void startBundle() {
        try {
            this.remoteBundle = this.stageBundleFactory.getBundle(createOutputReceiverFactory(), createTimerReceiverFactory(), this.stateRequestHandler, this.progressHandler);
            this.mainInputReceiver = (FnDataReceiver) Preconditions.checkNotNull((FnDataReceiver) Iterables.getOnlyElement(this.remoteBundle.getInputReceivers().values()), "Failed to retrieve main input receiver.");
        } catch (Throwable th) {
            throw new RuntimeException("Failed to start remote bundle", th);
        }
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public Tuple3<String, byte[], Integer> pollResult() throws Exception {
        Tuple2<String, byte[]> poll = this.resultBuffer.poll();
        if (poll == null) {
            return null;
        }
        this.reusableResultTuple.f0 = poll.f0;
        this.reusableResultTuple.f1 = poll.f1;
        this.reusableResultTuple.f2 = Integer.valueOf(((byte[]) poll.f1).length);
        return this.reusableResultTuple;
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public Tuple3<String, byte[], Integer> takeResult() throws Exception {
        Tuple2<String, byte[]> take = this.resultBuffer.take();
        this.reusableResultTuple.f0 = take.f0;
        this.reusableResultTuple.f1 = take.f1;
        this.reusableResultTuple.f2 = Integer.valueOf(((byte[]) take.f1).length);
        return this.reusableResultTuple;
    }

    @Override // org.apache.flink.python.PythonFunctionRunner
    public void flush() throws Exception {
        if (this.bundleStarted) {
            try {
                finishBundle();
            } finally {
                this.bundleStarted = false;
            }
        }
    }

    public void notifyNoMoreResults() {
        this.resultBuffer.add(Tuple2.of((Object) null, new byte[0]));
    }

    private void finishBundle() {
        RuntimeException runtimeException;
        try {
            try {
                this.remoteBundle.close();
                this.remoteBundle = null;
                this.mainInputReceiver = null;
                this.timerInputReceiver = null;
            } finally {
            }
        } catch (Throwable th) {
            this.remoteBundle = null;
            this.mainInputReceiver = null;
            this.timerInputReceiver = null;
            throw th;
        }
    }

    private RunnerApi.Environment createPythonExecutionEnvironment(ReadableConfig readableConfig, long j) throws Exception {
        PythonEnvironment createEnvironment = this.environmentManager.createEnvironment();
        if (!(createEnvironment instanceof ProcessPythonEnvironment)) {
            throw new RuntimeException("Currently only ProcessPythonEnvironment is supported.");
        }
        ProcessPythonEnvironment processPythonEnvironment = (ProcessPythonEnvironment) createEnvironment;
        Map<String, String> env = processPythonEnvironment.getEnv();
        Optional optional = readableConfig.getOptional(PythonOptions.PYTHON_JOB_OPTIONS);
        Objects.requireNonNull(env);
        optional.ifPresent(env::putAll);
        env.put(PYTHON_WORKER_MEMORY_LIMIT, String.valueOf(j));
        return Environments.createProcessEnvironment("", "", processPythonEnvironment.getCommand(), env);
    }

    private ExecutableStage createExecutableStage(RunnerApi.Environment environment) throws Exception {
        RunnerApi.Components.Builder putCoders = RunnerApi.Components.newBuilder().putPcollections("input", RunnerApi.PCollection.newBuilder().setWindowingStrategyId(Constants.WINDOW_STRATEGY).setCoderId(INPUT_CODER_ID).build()).putPcollections("", RunnerApi.PCollection.newBuilder().setWindowingStrategyId(Constants.WINDOW_STRATEGY).setCoderId(OUTPUT_CODER_ID).build()).putWindowingStrategies(Constants.WINDOW_STRATEGY, RunnerApi.WindowingStrategy.newBuilder().setWindowCoderId(Constants.WINDOW_CODER_ID).build()).putCoders(INPUT_CODER_ID, ProtoUtils.createCoderProto(this.inputCoderDescriptor)).putCoders(OUTPUT_CODER_ID, ProtoUtils.createCoderProto(this.outputCoderDescriptor)).putCoders(Constants.WINDOW_CODER_ID, getWindowCoderProto());
        for (Map.Entry<String, FlinkFnApi.CoderInfoDescriptor> entry : this.sideOutputCoderDescriptors.entrySet()) {
            String key = entry.getKey();
            String str = "side_coder-" + key;
            putCoders.putPcollections(key, RunnerApi.PCollection.newBuilder().setWindowingStrategyId(Constants.WINDOW_STRATEGY).setCoderId(str).build());
            putCoders.putCoders(str, ProtoUtils.createCoderProto(entry.getValue()));
        }
        getOptionalTimerCoderProto().ifPresent(coder -> {
            putCoders.putCoders(Constants.TIMER_CODER_ID, coder);
            putCoders.putCoders(Constants.WRAPPER_TIMER_CODER_ID, RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.TIMER_CODER_URN).build()).addComponentCoderIds(Constants.TIMER_CODER_ID).addComponentCoderIds(Constants.WINDOW_CODER_ID).build());
        });
        buildTransforms(putCoders);
        RunnerApi.Components build = putCoders.build();
        PipelineNode.PCollectionNode pCollection = PipelineNode.pCollection("input", build.getPcollectionsOrThrow("input"));
        List list = Collections.EMPTY_LIST;
        List list2 = Collections.EMPTY_LIST;
        List<TimerReference> timers = getTimers(build);
        List list3 = (List) build.getTransformsMap().keySet().stream().map(str2 -> {
            return PipelineNode.pTransform(str2, build.getTransformsOrThrow(str2));
        }).collect(Collectors.toList());
        ArrayList arrayList = new ArrayList();
        arrayList.add(PipelineNode.pCollection("", build.getPcollectionsOrThrow("")));
        Iterator<Map.Entry<String, FlinkFnApi.CoderInfoDescriptor>> it = this.sideOutputCoderDescriptors.entrySet().iterator();
        while (it.hasNext()) {
            String key2 = it.next().getKey();
            arrayList.add(PipelineNode.pCollection(key2, build.getPcollectionsOrThrow(key2)));
        }
        return ImmutableExecutableStage.of(build, environment, pCollection, list, list2, timers, list3, arrayList, createValueOnlyWireCoderSetting());
    }

    private Collection<RunnerApi.ExecutableStagePayload.WireCoderSetting> createValueOnlyWireCoderSetting() throws IOException {
        WindowedValue valueInGlobalWindow = WindowedValue.valueInGlobalWindow(new byte[0]);
        WindowedValue.FullWindowedValueCoder of = WindowedValue.FullWindowedValueCoder.of(ByteArrayCoder.of(), GlobalWindow.Coder.INSTANCE);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        of.encode(valueInGlobalWindow, (OutputStream) byteArrayOutputStream);
        ArrayList arrayList = new ArrayList();
        arrayList.add(RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE)).setPayload(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setInputOrOutputId("input").build());
        arrayList.add(RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE)).setPayload(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setInputOrOutputId("").build());
        Iterator<Map.Entry<String, FlinkFnApi.CoderInfoDescriptor>> it = this.sideOutputCoderDescriptors.entrySet().iterator();
        while (it.hasNext()) {
            arrayList.add(RunnerApi.ExecutableStagePayload.WireCoderSetting.newBuilder().setUrn(BeamUrns.getUrn(RunnerApi.StandardCoders.Enum.PARAM_WINDOWED_VALUE)).setPayload(ByteString.copyFrom(byteArrayOutputStream.toByteArray())).setInputOrOutputId(it.next().getKey()).build());
        }
        return arrayList;
    }

    private RunnerApi.Coder getWindowCoderProto() {
        return RunnerApi.Coder.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(ModelCoders.GLOBAL_WINDOW_CODER_URN).build()).build();
    }

    protected abstract void buildTransforms(RunnerApi.Components.Builder builder);

    protected abstract List<TimerReference> getTimers(RunnerApi.Components components);

    protected abstract Optional<RunnerApi.Coder> getOptionalTimerCoderProto();

    private OutputReceiverFactory createOutputReceiverFactory() {
        return new OutputReceiverFactory() { // from class: org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.1
            @Override // org.apache.beam.runners.fnexecution.control.OutputReceiverFactory
            public FnDataReceiver<WindowedValue<byte[]>> create(String str) {
                return windowedValue -> {
                    BeamPythonFunctionRunner.this.resultBuffer.add(Tuple2.of(str, (byte[]) windowedValue.getValue()));
                };
            }
        };
    }

    @VisibleForTesting
    public JobBundleFactory createJobBundleFactory(Struct struct) throws Exception {
        TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(getClass().getClassLoader());
        try {
            DefaultJobBundleFactory create = DefaultJobBundleFactory.create(JobInfo.create(this.taskName, this.taskName, this.environmentManager.createRetrievalToken(), struct));
            if (of != null) {
                of.close();
            }
            return create;
        } catch (Throwable th) {
            if (of != null) {
                try {
                    of.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private StageBundleFactory createStageBundleFactory(JobBundleFactory jobBundleFactory, RunnerApi.Environment environment) throws Exception {
        try {
            TemporaryClassLoaderContext of = TemporaryClassLoaderContext.of(getClass().getClassLoader());
            try {
                StageBundleFactory forStage = jobBundleFactory.forStage(createExecutableStage(environment));
                if (of != null) {
                    of.close();
                }
                return forStage;
            } finally {
            }
        } catch (Throwable th) {
            throw new RuntimeException(this.environmentManager.getBootLog(), th);
        }
    }

    private BundleProgressHandler getProgressHandler(final FlinkMetricContainer flinkMetricContainer) {
        return flinkMetricContainer == null ? BundleProgressHandler.ignored() : new BundleProgressHandler() { // from class: org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.2
            @Override // org.apache.beam.runners.fnexecution.control.BundleProgressHandler
            public void onProgress(BeamFnApi.ProcessBundleProgressResponse processBundleProgressResponse) {
                flinkMetricContainer.updateMetrics(BeamPythonFunctionRunner.this.taskName, processBundleProgressResponse.getMonitoringInfosList());
            }

            @Override // org.apache.beam.runners.fnexecution.control.BundleProgressHandler
            public void onCompleted(BeamFnApi.ProcessBundleResponse processBundleResponse) {
                flinkMetricContainer.updateMetrics(BeamPythonFunctionRunner.this.taskName, processBundleResponse.getMonitoringInfosList());
            }
        };
    }

    private TimerReceiverFactory createTimerReceiverFactory() {
        return new TimerReceiverFactory(this.stageBundleFactory, (timer, timerData) -> {
            TimerRegistrationAction timerRegistrationAction = new TimerRegistrationAction(this.timerRegistration, (byte[]) timer.getUserKey(), this.unregisteredTimers);
            this.unregisteredTimers.add(timerRegistrationAction);
            MailboxExecutor mainMailboxExecutor = this.environment.getMainMailboxExecutor();
            Objects.requireNonNull(timerRegistrationAction);
            mainMailboxExecutor.execute(timerRegistrationAction::run, "PythonTimerRegistration");
        }, null);
    }

    private static StateRequestHandler getStateRequestHandler(@Nullable KeyedStateBackend<?> keyedStateBackend, @Nullable OperatorStateBackend operatorStateBackend, @Nullable TypeSerializer<?> typeSerializer, @Nullable TypeSerializer<?> typeSerializer2, ReadableConfig readableConfig) {
        return BeamStateRequestHandler.of(keyedStateBackend, operatorStateBackend, typeSerializer, typeSerializer2, readableConfig);
    }
}
