package org.apache.flink.streaming.api.operators.python.embedded;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator;
import org.apache.flink.streaming.api.utils.PythonOperatorUtils;
import org.apache.flink.streaming.api.utils.PythonTypeUtils;
import org.apache.flink.table.runtime.operators.window.Window;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import pemja.core.object.PyIterator;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator.class */
public class EmbeddedPythonWindowOperator<K, IN, OUT, W extends Window> extends AbstractOneInputEmbeddedPythonFunctionOperator<IN, OUT> implements Triggerable<K, W> {
    private static final long serialVersionUID = 1;
    private final TypeSerializer<W> windowSerializer;
    private transient TypeInformation<K> keyTypeInfo;
    private transient PythonTypeUtils.DataConverter<K, Object> keyConverter;
    private transient EmbeddedPythonWindowOperator<K, IN, OUT, W>.WindowContextImpl windowContext;
    private transient EmbeddedPythonWindowOperator<K, IN, OUT, W>.WindowTimerContextImpl windowTimerContext;

    /* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator$WindowContextImpl.class */
    private class WindowContextImpl {
        private final InternalTimerService<W> timerService;

        WindowContextImpl(InternalTimerService<W> internalTimerService) {
            this.timerService = internalTimerService;
        }

        public TypeSerializer<W> getWindowSerializer() {
            return EmbeddedPythonWindowOperator.this.windowSerializer;
        }

        public long timestamp() {
            return EmbeddedPythonWindowOperator.this.timestamp;
        }

        public InternalTimerService<W> timerService() {
            return this.timerService;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Object getCurrentKey() {
            return EmbeddedPythonWindowOperator.this.keyConverter.toExternal(((Row) EmbeddedPythonWindowOperator.this.getCurrentKey()).getField(0));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/python/embedded/EmbeddedPythonWindowOperator$WindowTimerContextImpl.class */
    public class WindowTimerContextImpl {
        private final InternalTimerService<W> timerService;
        private InternalTimer<K, W> timer;

        WindowTimerContextImpl(InternalTimerService<W> internalTimerService) {
            this.timerService = internalTimerService;
        }

        public InternalTimerService<W> timerService() {
            return this.timerService;
        }

        public long timestamp() {
            return this.timer.getTimestamp();
        }

        public W getWindow() {
            return (W) this.timer.getNamespace();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public Object getCurrentKey() {
            return EmbeddedPythonWindowOperator.this.keyConverter.toExternal(((Row) this.timer.getKey()).getField(0));
        }
    }

    public EmbeddedPythonWindowOperator(Configuration configuration, DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<IN> typeInformation, TypeInformation<OUT> typeInformation2, TypeSerializer<W> typeSerializer) {
        super(configuration, dataStreamPythonFunctionInfo, typeInformation, typeInformation2);
        this.windowSerializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer);
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedDataStreamPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.embedded.AbstractEmbeddedPythonFunctionOperator, org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator
    public void open() throws Exception {
        this.keyTypeInfo = getInputTypeInfo().getTypeAt(0);
        this.keyConverter = PythonTypeUtils.TypeInfoToDataConverter.typeInfoDataConverter(this.keyTypeInfo);
        InternalTimerService internalTimerService = getInternalTimerService("window-timers", this.windowSerializer, this);
        this.windowContext = new WindowContextImpl(internalTimerService);
        this.windowTimerContext = new WindowTimerContextImpl(internalTimerService);
        super.open();
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator
    public List<FlinkFnApi.UserDefinedDataStreamFunction> createUserDefinedFunctionsProto() {
        return ProtoUtils.createUserDefinedDataStreamStatefulFunctionProtos(getPythonFunctionInfo(), getRuntimeContext(), getJobParameters(), this.keyTypeInfo, PythonOperatorUtils.inBatchExecutionMode(getKeyedStateBackend()), ((Boolean) this.config.get(PythonOptions.PYTHON_METRIC_ENABLED)).booleanValue(), ((Boolean) this.config.get(PythonOptions.PYTHON_PROFILE_ENABLED)).booleanValue(), this.hasSideOutput, ((Integer) this.config.get(PythonOptions.STATE_CACHE_SIZE)).intValue(), ((Integer) this.config.get(PythonOptions.MAP_STATE_READ_CACHE_SIZE)).intValue(), ((Integer) this.config.get(PythonOptions.MAP_STATE_WRITE_CACHE_SIZE)).intValue());
    }

    public void onEventTime(InternalTimer<K, W> internalTimer) throws Exception {
        this.collector.setAbsoluteTimestamp(internalTimer.getTimestamp());
        invokeUserFunction(internalTimer);
    }

    public void onProcessingTime(InternalTimer<K, W> internalTimer) throws Exception {
        this.collector.eraseTimestamp();
        invokeUserFunction(internalTimer);
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator
    public Object getFunctionContext() {
        return this.windowContext;
    }

    @Override // org.apache.flink.streaming.api.operators.python.embedded.AbstractOneInputEmbeddedPythonFunctionOperator
    public Object getTimerContext() {
        return this.windowTimerContext;
    }

    @Override // org.apache.flink.streaming.api.operators.python.DataStreamPythonFunctionOperator
    public <T> DataStreamPythonFunctionOperator<T> copy(DataStreamPythonFunctionInfo dataStreamPythonFunctionInfo, TypeInformation<T> typeInformation) {
        return new EmbeddedPythonWindowOperator(this.config, dataStreamPythonFunctionInfo, getInputTypeInfo(), typeInformation, this.windowSerializer);
    }

    private void invokeUserFunction(InternalTimer<K, W> internalTimer) throws Exception {
        ((WindowTimerContextImpl) this.windowTimerContext).timer = internalTimer;
        PyIterator pyIterator = (PyIterator) this.interpreter.invokeMethod("operation", "on_timer", Long.valueOf(internalTimer.getTimestamp()));
        while (pyIterator.hasNext()) {
            this.collector.collect(this.outputDataConverter.toInternal(pyIterator.next()));
        }
        pyIterator.close();
        ((WindowTimerContextImpl) this.windowTimerContext).timer = null;
    }
}
