package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Locale;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.IndexedCombinedWatermarkStatus;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.event.WatermarkEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.InternalOperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamOperatorStateHandler;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributesBuilder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.streaming.util.LatencyStats;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
/* loaded from: input_file:org/apache/flink/streaming/api/operators/AbstractStreamOperator.class */
public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>, YieldingOperator<OUT>, StreamOperatorStateHandler.CheckpointedStreamOperator, KeyContextHandler, Serializable {
    private static final long serialVersionUID = 1;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
    private transient StreamTask<?, ?> container;
    protected transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    protected transient IndexedCombinedWatermarkStatus combinedWatermark;
    private transient StreamingRuntimeContext runtimeContext;

    @Nullable
    private transient MailboxExecutor mailboxExecutor;

    @Nullable
    private transient MailboxWatermarkProcessor watermarkProcessor;
    protected transient KeySelector<?, ?> stateKeySelector1;
    protected transient KeySelector<?, ?> stateKeySelector2;
    protected transient StreamOperatorStateHandler stateHandler;
    protected transient InternalTimeServiceManager<?> timeServiceManager;
    protected transient InternalOperatorMetricGroup metrics;
    protected transient LatencyStats latencyStats;
    protected transient ProcessingTimeService processingTimeService;
    protected transient RecordAttributes lastRecordAttributes1;
    protected transient RecordAttributes lastRecordAttributes2;

    public AbstractStreamOperator() {
    }

    public AbstractStreamOperator(StreamOperatorParameters<OUT> streamOperatorParameters) {
        if (streamOperatorParameters != null) {
            setup(streamOperatorParameters.getContainingTask(), streamOperatorParameters.getStreamConfig(), streamOperatorParameters.getOutput());
            this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(streamOperatorParameters.getProcessingTimeService());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<OUT>> output) {
        LatencyStats.Granularity granularity;
        Environment environment = streamTask.getEnvironment();
        this.container = streamTask;
        this.config = streamConfig;
        this.output = output;
        this.metrics = environment.getMetricGroup().getOrAddOperator(streamConfig.getOperatorID(), streamConfig.getOperatorName());
        this.combinedWatermark = IndexedCombinedWatermarkStatus.forInputsCount(2);
        try {
            Configuration configuration = environment.getTaskManagerInfo().getConfiguration();
            int intValue = ((Integer) configuration.get(MetricOptions.LATENCY_HISTORY_SIZE)).intValue();
            if (intValue <= 0) {
                LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, Integer.valueOf(intValue));
                intValue = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue().intValue();
            }
            String str = (String) configuration.get(MetricOptions.LATENCY_SOURCE_GRANULARITY);
            try {
                granularity = LatencyStats.Granularity.valueOf(str.toUpperCase(Locale.ROOT));
            } catch (IllegalArgumentException e) {
                granularity = LatencyStats.Granularity.OPERATOR;
                LOG.warn("Configured value {} option for {} is invalid. Defaulting to {}.", new Object[]{str, MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), granularity});
            }
            this.latencyStats = new LatencyStats(this.metrics.getTaskMetricGroup().addGroup("latency"), intValue, this.container.getIndexInSubtaskGroup(), getOperatorID(), granularity);
        } catch (Exception e2) {
            LOG.warn("An error occurred while instantiating latency metrics.", e2);
            this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().addGroup("latency"), 1, 0, new OperatorID(), LatencyStats.Granularity.SINGLE);
        }
        this.runtimeContext = new StreamingRuntimeContext(environment, environment.getAccumulatorRegistry().getUserMap(), getMetricGroup(), getOperatorID(), getProcessingTimeService(), null, environment.getExternalResourceInfoProvider());
        this.stateKeySelector1 = streamConfig.getStatePartitioner(0, getUserCodeClassloader());
        this.stateKeySelector2 = streamConfig.getStatePartitioner(1, getUserCodeClassloader());
        this.lastRecordAttributes1 = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
        this.lastRecordAttributes2 = RecordAttributes.EMPTY_RECORD_ATTRIBUTES;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setProcessingTimeService(ProcessingTimeService processingTimeService) {
        this.processingTimeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public OperatorMetricGroup getMetricGroup() {
        return this.metrics;
    }

    protected void beforeInitializeStateHandler() {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public final void initializeState(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception {
        TypeSerializer<?> stateKeySerializer = this.config.getStateKeySerializer(getUserCodeClassloader());
        CloseableRegistry closeableRegistry = (CloseableRegistry) Preconditions.checkNotNull(((StreamTask) Preconditions.checkNotNull(getContainingTask())).getCancelables());
        StreamOperatorStateContext streamOperatorStateContext = streamTaskStateInitializer.streamOperatorStateContext(getOperatorID(), getClass().getSimpleName(), getProcessingTimeService(), this, stateKeySerializer, closeableRegistry, this.metrics, this.config.getManagedMemoryFractionOperatorUseCaseOfSlot(ManagedMemoryUseCase.STATE_BACKEND, this.runtimeContext.getJobConfiguration(), this.runtimeContext.getTaskManagerRuntimeInfo().getConfiguration(), this.runtimeContext.getUserCodeClassLoader()), isUsingCustomRawKeyedState(), isAsyncStateProcessingEnabled());
        this.stateHandler = new StreamOperatorStateHandler(streamOperatorStateContext, getExecutionConfig(), closeableRegistry);
        this.timeServiceManager = isAsyncStateProcessingEnabled() ? streamOperatorStateContext.asyncInternalTimerServiceManager() : streamOperatorStateContext.internalTimerServiceManager();
        beforeInitializeStateHandler();
        this.stateHandler.initializeOperatorState(this);
        this.runtimeContext.setKeyedStateStore(this.stateHandler.getKeyedStateStore().orElse(null));
    }

    @Internal
    protected boolean isUsingCustomRawKeyedState() {
        return false;
    }

    @Internal
    public boolean isAsyncStateProcessingEnabled() {
        return false;
    }

    @Override // org.apache.flink.streaming.api.operators.YieldingOperator
    @Internal
    public void setMailboxExecutor(MailboxExecutor mailboxExecutor) {
        this.mailboxExecutor = mailboxExecutor;
    }

    @Internal
    public boolean useSplittableTimers() {
        return false;
    }

    @Internal
    private boolean areSplittableTimersConfigured() {
        return areSplittableTimersConfigured(this.config);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean areSplittableTimersConfigured(StreamConfig streamConfig) {
        return streamConfig.isCheckpointingEnabled() && streamConfig.isUnalignedCheckpointsEnabled() && streamConfig.isUnalignedCheckpointsSplittableTimersEnabled();
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        if (useSplittableTimers() && areSplittableTimersConfigured() && getTimeServiceManager().isPresent()) {
            this.watermarkProcessor = new MailboxWatermarkProcessor(this.output, this.mailboxExecutor, getTimeServiceManager().get());
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void finish() throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        if (this.stateHandler != null) {
            this.stateHandler.dispose();
        }
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void prepareSnapshotPreBarrier(long j) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public OperatorSnapshotFutures snapshotState(long j, long j2, CheckpointOptions checkpointOptions, CheckpointStreamFactory checkpointStreamFactory) throws Exception {
        return this.stateHandler.snapshotState(this, Optional.ofNullable(this.timeServiceManager), getOperatorName(), j, j2, checkpointOptions, checkpointStreamFactory, isUsingCustomRawKeyedState(), isAsyncStateProcessingEnabled());
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.stateHandler.notifyCheckpointComplete(j);
    }

    @Override // org.apache.flink.api.common.state.CheckpointListener
    public void notifyCheckpointAborted(long j) throws Exception {
        this.stateHandler.notifyCheckpointAborted(j);
    }

    public ExecutionConfig getExecutionConfig() {
        return this.container.getExecutionConfig();
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public StreamTask<?, ?> getContainingTask() {
        return this.container;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.container.getUserCodeClassLoader();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String getOperatorName() {
        return this.runtimeContext != null ? this.runtimeContext.getTaskInfo().getTaskNameWithSubtasks() : getClass().getSimpleName();
    }

    @VisibleForTesting
    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.stateHandler.getKeyedStateBackend();
    }

    @VisibleForTesting
    public OperatorStateBackend getOperatorStateBackend() {
        return this.stateHandler.getOperatorStateBackend();
    }

    @VisibleForTesting
    public ProcessingTimeService getProcessingTimeService() {
        return this.processingTimeService;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        return (S) this.stateHandler.getOrCreateKeyedState(typeSerializer, stateDescriptor);
    }

    public <S extends State, N> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S) this.stateHandler.getPartitionedState(n, typeSerializer, stateDescriptor);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement1(StreamRecord streamRecord) throws Exception {
        setKeyContextElement(streamRecord, this.stateKeySelector1);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public void setKeyContextElement2(StreamRecord streamRecord) throws Exception {
        setKeyContextElement(streamRecord, this.stateKeySelector2);
    }

    @Override // org.apache.flink.streaming.api.operators.KeyContextHandler
    @Internal
    public boolean hasKeyContext1() {
        return this.stateKeySelector1 != null;
    }

    @Override // org.apache.flink.streaming.api.operators.KeyContextHandler
    @Internal
    public boolean hasKeyContext2() {
        return this.stateKeySelector2 != null;
    }

    private <T> void setKeyContextElement(StreamRecord<T> streamRecord, KeySelector<T, ?> keySelector) throws Exception {
        if (keySelector != null) {
            setCurrentKey(keySelector.getKey(streamRecord.getValue()));
        }
    }

    @Override // org.apache.flink.streaming.api.operators.KeyContext
    public void setCurrentKey(Object obj) {
        this.stateHandler.setCurrentKey(obj);
    }

    public Object getCurrentKey() {
        return this.stateHandler.getCurrentKey();
    }

    public KeyedStateStore getKeyedStateStore() {
        if (this.stateHandler == null) {
            return null;
        }
        return this.stateHandler.getKeyedStateStore().orElse(null);
    }

    protected KeySelector<?, ?> getStateKeySelector1() {
        return this.stateKeySelector1;
    }

    protected KeySelector<?, ?> getStateKeySelector2() {
        return this.stateKeySelector2;
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
        reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
        reportOrForwardLatencyMarker(latencyMarker);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void reportOrForwardLatencyMarker(LatencyMarker latencyMarker) {
        this.latencyStats.reportLatency(latencyMarker);
        this.output.emitLatencyMarker(latencyMarker);
    }

    public <K, N> InternalTimerService<N> getInternalTimerService(String str, TypeSerializer<N> typeSerializer, Triggerable<K, N> triggerable) {
        if (this.timeServiceManager == null) {
            throw new RuntimeException("The timer service has not been initialized.");
        }
        InternalTimeServiceManager<?> internalTimeServiceManager = this.timeServiceManager;
        TypeSerializer<?> keySerializer = this.stateHandler.getKeySerializer();
        Preconditions.checkState(keySerializer != null, "Timers can only be used on keyed operators.");
        return internalTimeServiceManager.getInternalTimerService(str, keySerializer, typeSerializer, triggerable);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        if (this.watermarkProcessor != null) {
            this.watermarkProcessor.emitWatermarkInsideMailbox(watermark);
        } else {
            emitWatermarkDirectly(watermark);
        }
    }

    private void emitWatermarkDirectly(Watermark watermark) throws Exception {
        if (this.timeServiceManager != null) {
            this.timeServiceManager.advanceWatermark(watermark);
        }
        this.output.emitWatermark(watermark);
    }

    private void processWatermark(Watermark watermark, int i) throws Exception {
        if (this.combinedWatermark.updateWatermark(i, watermark.getTimestamp())) {
            processWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark()));
        }
    }

    public void processWatermark1(Watermark watermark) throws Exception {
        processWatermark(watermark, 0);
    }

    public void processWatermark2(Watermark watermark) throws Exception {
        processWatermark(watermark, 1);
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        this.output.emitWatermarkStatus(watermarkStatus);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processWatermarkStatus(WatermarkStatus watermarkStatus, int i) throws Exception {
        boolean isIdle = this.combinedWatermark.isIdle();
        if (this.combinedWatermark.updateStatus(i, watermarkStatus.isIdle())) {
            processWatermark(new Watermark(this.combinedWatermark.getCombinedWatermark()));
        }
        if (isIdle != this.combinedWatermark.isIdle()) {
            this.output.emitWatermarkStatus(watermarkStatus);
        }
    }

    public final void processWatermarkStatus1(WatermarkStatus watermarkStatus) throws Exception {
        processWatermarkStatus(watermarkStatus, 0);
    }

    public final void processWatermarkStatus2(WatermarkStatus watermarkStatus) throws Exception {
        processWatermarkStatus(watermarkStatus, 1);
    }

    @Override // org.apache.flink.streaming.api.operators.StreamOperator
    public OperatorID getOperatorID() {
        return this.config.getOperatorID();
    }

    protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
        return Optional.ofNullable(this.timeServiceManager);
    }

    @Experimental
    public void processRecordAttributes(RecordAttributes recordAttributes) throws Exception {
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Collections.singletonList(recordAttributes)).build());
    }

    @Experimental
    public void processRecordAttributes1(RecordAttributes recordAttributes) {
        this.lastRecordAttributes1 = recordAttributes;
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Arrays.asList(this.lastRecordAttributes1, this.lastRecordAttributes2)).build());
    }

    @Experimental
    public void processRecordAttributes2(RecordAttributes recordAttributes) {
        this.lastRecordAttributes2 = recordAttributes;
        this.output.emitRecordAttributes(new RecordAttributesBuilder(Arrays.asList(this.lastRecordAttributes1, this.lastRecordAttributes2)).build());
    }

    @Experimental
    public void processWatermark(WatermarkEvent watermarkEvent) throws Exception {
        this.output.emitWatermark(watermarkEvent);
    }

    @Experimental
    public void processWatermark1(WatermarkEvent watermarkEvent) throws Exception {
        this.output.emitWatermark(watermarkEvent);
    }

    @Experimental
    public void processWatermark2(WatermarkEvent watermarkEvent) throws Exception {
        this.output.emitWatermark(watermarkEvent);
    }
}
