package org.apache.flink.runtime.asyncprocessing.operators;

import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclaredVariable;
import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedProcessFunction;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

@Internal
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AsyncKeyedProcessOperator.class */
public class AsyncKeyedProcessOperator<K, IN, OUT> extends AbstractAsyncStateUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>> implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    private static final long serialVersionUID = 1;
    private transient DeclaredVariable<Long> sharedTimestamp;
    private transient TimestampedCollectorWithDeclaredVariable<OUT> collector;
    private transient AsyncKeyedProcessOperator<K, IN, OUT>.ContextImpl context;
    private transient AsyncKeyedProcessOperator<K, IN, OUT>.OnTimerContextImpl onTimerContext;
    private transient ThrowingConsumer<IN, Exception> processor;
    private transient ThrowingConsumer<Long, Exception> timerProcessor;

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AsyncKeyedProcessOperator$ContextImpl.class */
    private class ContextImpl extends KeyedProcessFunction<K, IN, OUT>.Context {
        private final TimerService timerService;
        private final DeclaredVariable<Long> timestamp;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        ContextImpl(KeyedProcessFunction<K, IN, OUT> keyedProcessFunction, TimerService timerService, DeclaredVariable<Long> declaredVariable) {
            super();
            Objects.requireNonNull(keyedProcessFunction);
            this.timerService = (TimerService) Preconditions.checkNotNull(timerService);
            this.timestamp = declaredVariable;
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public Long timestamp() {
            return this.timestamp.get();
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public TimerService timerService() {
            return this.timerService;
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public <X> void output(OutputTag<X> outputTag, X x) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            AsyncKeyedProcessOperator.this.output.collect(outputTag, new StreamRecord<>(x, this.timestamp.get().longValue()));
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public K getCurrentKey() {
            return (K) AsyncKeyedProcessOperator.this.getCurrentKey();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/operators/AsyncKeyedProcessOperator$OnTimerContextImpl.class */
    public class OnTimerContextImpl extends KeyedProcessFunction<K, IN, OUT>.OnTimerContext {
        private final TimerService timerService;
        private final DeclaredVariable<String> timeDomain;
        private final DeclaredVariable<Long> timestamp;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        OnTimerContextImpl(KeyedProcessFunction<K, IN, OUT> keyedProcessFunction, TimerService timerService, DeclarationContext declarationContext) {
            super();
            Objects.requireNonNull(keyedProcessFunction);
            this.timerService = (TimerService) Preconditions.checkNotNull(timerService);
            this.timeDomain = declarationContext.declareVariable(StringSerializer.INSTANCE, "_OnTimerContextImpl$timeDomain", null);
            this.timestamp = declarationContext.declareVariable(LongSerializer.INSTANCE, "_OnTimerContextImpl$timestamp", null);
        }

        public void setTime(long j, TimeDomain timeDomain) {
            this.timestamp.set(Long.valueOf(j));
            this.timeDomain.set(timeDomain.name());
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public Long timestamp() {
            Preconditions.checkState(this.timestamp.get() != null);
            return this.timestamp.get();
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public TimerService timerService() {
            return this.timerService;
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public <X> void output(OutputTag<X> outputTag, X x) {
            if (outputTag == null) {
                throw new IllegalArgumentException("OutputTag must not be null.");
            }
            AsyncKeyedProcessOperator.this.output.collect(outputTag, new StreamRecord<>(x, timestamp().longValue()));
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext
        public TimeDomain timeDomain() {
            Preconditions.checkState(this.timeDomain.get() != null);
            return TimeDomain.valueOf(this.timeDomain.get());
        }

        @Override // org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext, org.apache.flink.streaming.api.functions.KeyedProcessFunction.Context
        public K getCurrentKey() {
            return (K) AsyncKeyedProcessOperator.this.getCurrentKey();
        }
    }

    public AsyncKeyedProcessOperator(KeyedProcessFunction<K, IN, OUT> keyedProcessFunction) {
        super(keyedProcessFunction);
    }

    @Override // org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator, org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.sharedTimestamp = this.declarationContext.declareVariable(LongSerializer.INSTANCE, "_AsyncKeyedProcessOperator$sharedTimestamp", null);
        this.collector = new TimestampedCollectorWithDeclaredVariable<>(this.output, this.sharedTimestamp);
        SimpleTimerService simpleTimerService = new SimpleTimerService(getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this));
        this.context = new ContextImpl((KeyedProcessFunction) this.userFunction, simpleTimerService, this.sharedTimestamp);
        this.onTimerContext = new OnTimerContextImpl((KeyedProcessFunction) this.userFunction, simpleTimerService, this.declarationContext);
        if (!(this.userFunction instanceof DeclaringAsyncKeyedProcessFunction)) {
            this.processor = obj -> {
                ((KeyedProcessFunction) this.userFunction).processElement(obj, this.context, this.collector);
            };
            this.timerProcessor = l -> {
                ((KeyedProcessFunction) this.userFunction).onTimer(l.longValue(), this.onTimerContext, this.collector);
            };
        } else {
            DeclaringAsyncKeyedProcessFunction declaringAsyncKeyedProcessFunction = (DeclaringAsyncKeyedProcessFunction) this.userFunction;
            declaringAsyncKeyedProcessFunction.declareVariables(this.declarationContext);
            this.processor = declaringAsyncKeyedProcessFunction.declareProcess(this.declarationContext, this.context, this.collector);
            this.timerProcessor = declaringAsyncKeyedProcessFunction.declareOnTimer(this.declarationContext, this.onTimerContext, this.collector);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public void onEventTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        this.collector.setAbsoluteTimestamp(internalTimer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, internalTimer);
    }

    @Override // org.apache.flink.streaming.api.operators.Triggerable
    public void onProcessingTime(InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        this.collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, internalTimer);
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.collector.setTimestamp(streamRecord);
        this.processor.accept(streamRecord.getValue());
    }

    private void invokeUserFunction(TimeDomain timeDomain, InternalTimer<K, VoidNamespace> internalTimer) throws Exception {
        this.onTimerContext.setTime(internalTimer.getTimestamp(), timeDomain);
        this.timerProcessor.accept(Long.valueOf(internalTimer.getTimestamp()));
    }
}
