package org.apache.flink.datastream.impl.operators;

import java.util.Map;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
import org.apache.flink.api.common.watermark.WatermarkHandlingStrategy;
import org.apache.flink.datastream.api.context.ProcessingTimeManager;
import org.apache.flink.datastream.api.context.TwoOutputNonPartitionedContext;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.impl.common.OutputCollector;
import org.apache.flink.datastream.impl.common.TimestampCollector;
import org.apache.flink.datastream.impl.context.DefaultRuntimeContext;
import org.apache.flink.datastream.impl.context.DefaultTwoOutputNonPartitionedContext;
import org.apache.flink.datastream.impl.context.DefaultTwoOutputPartitionedContext;
import org.apache.flink.datastream.impl.context.UnsupportedProcessingTimeManager;
import org.apache.flink.datastream.impl.extension.eventtime.EventTimeExtensionImpl;
import org.apache.flink.datastream.impl.extension.eventtime.functions.EventTimeWrappedTwoOutputStreamProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AbstractAsyncStateUdfStreamOperator;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermark.AbstractInternalWatermarkDeclaration;
import org.apache.flink.streaming.runtime.watermark.extension.eventtime.EventTimeWatermarkHandler;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator.class */
public class TwoOutputProcessOperator<IN, OUT_MAIN, OUT_SIDE> extends AbstractAsyncStateUdfStreamOperator<OUT_MAIN, TwoOutputStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE>> implements OneInputStreamOperator<IN, OUT_MAIN>, BoundedOneInput {
    protected transient TimestampCollector<OUT_MAIN> mainCollector;
    protected transient TimestampCollector<OUT_SIDE> sideCollector;
    protected transient DefaultRuntimeContext context;
    protected transient DefaultTwoOutputPartitionedContext<OUT_MAIN, OUT_SIDE> partitionedContext;
    protected transient TwoOutputNonPartitionedContext<OUT_MAIN, OUT_SIDE> nonPartitionedContext;
    protected OutputTag<OUT_SIDE> outputTag;
    protected transient Map<String, AbstractInternalWatermarkDeclaration<?>> watermarkDeclarationMap;
    protected transient EventTimeWatermarkHandler eventTimeWatermarkHandler;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/datastream/impl/operators/TwoOutputProcessOperator$SideOutputCollector.class */
    public class SideOutputCollector extends TimestampCollector<OUT_SIDE> {
        private final Output<StreamRecord<OUT_MAIN>> output;

        public SideOutputCollector(Output<StreamRecord<OUT_MAIN>> output) {
            this.output = output;
        }

        public void collect(OUT_SIDE out_side) {
            this.output.collect(TwoOutputProcessOperator.this.outputTag, this.reuse.replace(out_side));
        }

        public void collectAndOverwriteTimestamp(OUT_SIDE out_side, long j) {
            setTimestamp(j);
            this.output.collect(TwoOutputProcessOperator.this.outputTag, this.reuse.replace(out_side));
        }
    }

    public TwoOutputProcessOperator(TwoOutputStreamProcessFunction<IN, OUT_MAIN, OUT_SIDE> twoOutputStreamProcessFunction, OutputTag<OUT_SIDE> outputTag) {
        super(twoOutputStreamProcessFunction);
        this.outputTag = outputTag;
    }

    public void open() throws Exception {
        this.mainCollector = getMainCollector();
        this.sideCollector = getSideCollector();
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        OperatorStateBackend operatorStateBackend = getOperatorStateBackend();
        TaskInfo taskInfo = runtimeContext.getTaskInfo();
        this.context = new DefaultRuntimeContext(runtimeContext.getJobInfo().getJobName(), runtimeContext.getJobType(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getTaskName(), taskInfo.getIndexOfThisSubtask(), taskInfo.getAttemptNumber(), runtimeContext.getMetricGroup());
        this.watermarkDeclarationMap = (Map) this.config.getWatermarkDeclarations(getUserCodeClassloader()).stream().collect(Collectors.toMap((v0) -> {
            return v0.getIdentifier();
        }, Function.identity()));
        this.partitionedContext = new DefaultTwoOutputPartitionedContext<>(this.context, this::currentKey, getProcessorWithKey(), getProcessingTimeManager(), runtimeContext, operatorStateBackend);
        this.nonPartitionedContext = getNonPartitionedContext();
        this.partitionedContext.setNonPartitionedContext(this.nonPartitionedContext);
        this.eventTimeWatermarkHandler = new EventTimeWatermarkHandler(1, this.output, this.timeServiceManager);
        if (this.userFunction instanceof EventTimeWrappedTwoOutputStreamProcessFunction) {
            this.userFunction.initEventTimeExtension(getTimerService(), getEventTimeSupplier(), this.eventTimeWatermarkHandler);
        }
        this.userFunction.open(this.nonPartitionedContext);
    }

    public void processElement(StreamRecord<IN> streamRecord) throws Exception {
        this.mainCollector.setTimestampFromStreamRecord(streamRecord);
        this.sideCollector.setTimestampFromStreamRecord(streamRecord);
        this.userFunction.processRecord(streamRecord.getValue(), this.mainCollector, this.sideCollector, this.partitionedContext);
    }

    public void processWatermarkInternal(WatermarkEvent watermarkEvent) throws Exception {
        if (this.userFunction.onWatermark(watermarkEvent.getWatermark(), this.mainCollector, this.sideCollector, this.nonPartitionedContext) == WatermarkHandlingResult.PEEK && this.watermarkDeclarationMap.get(watermarkEvent.getWatermark().getIdentifier()).getDefaultHandlingStrategy() == WatermarkHandlingStrategy.FORWARD) {
            if (EventTimeExtensionImpl.isEventTimeExtensionWatermark(watermarkEvent.getWatermark())) {
                this.eventTimeWatermarkHandler.processWatermark(watermarkEvent.getWatermark(), 0);
            } else {
                this.output.emitWatermark(watermarkEvent);
            }
        }
    }

    public void endInput() throws Exception {
        this.userFunction.endInput(this.nonPartitionedContext);
    }

    protected TimestampCollector<OUT_MAIN> getMainCollector() {
        return new OutputCollector(this.output);
    }

    public TimestampCollector<OUT_SIDE> getSideCollector() {
        return new SideOutputCollector(this.output);
    }

    protected Object currentKey() {
        throw new UnsupportedOperationException("The key is only defined for keyed operator");
    }

    protected BiConsumer<Runnable, Object> getProcessorWithKey() {
        return isAsyncStateProcessingEnabled() ? (runnable, obj) -> {
            Objects.requireNonNull(runnable);
            asyncProcessWithKey(obj, runnable::run);
        } : (runnable2, obj2) -> {
            Object currentKey = currentKey();
            try {
                runnable2.run();
                setCurrentKey(currentKey);
            } catch (Throwable th) {
                setCurrentKey(currentKey);
                throw th;
            }
        };
    }

    protected TwoOutputNonPartitionedContext<OUT_MAIN, OUT_SIDE> getNonPartitionedContext() {
        return new DefaultTwoOutputNonPartitionedContext(this.context, this.partitionedContext, this.mainCollector, this.sideCollector, false, null, this.output, this.watermarkDeclarationMap);
    }

    protected ProcessingTimeManager getProcessingTimeManager() {
        return UnsupportedProcessingTimeManager.INSTANCE;
    }

    public void close() throws Exception {
        super.close();
        this.userFunction.close();
    }

    public boolean isAsyncStateProcessingEnabled() {
        return false;
    }

    protected InternalTimerService<VoidNamespace> getTimerService() {
        return null;
    }

    protected Supplier<Long> getEventTimeSupplier() {
        return () -> {
            return Long.valueOf(this.eventTimeWatermarkHandler.getLastEmitWatermark());
        };
    }
}
