package org.apache.beam.runners.flink.translation.wrappers.streaming.io;

import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.adapter.FlinkKey;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.ValueWithRecordId;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.joda.time.Duration;

/* loaded from: input_file:org/apache/beam/runners/flink/translation/wrappers/streaming/io/DedupingOperator.class */
public class DedupingOperator<T> extends AbstractStreamOperator<WindowedValue<T>> implements OneInputStreamOperator<WindowedValue<ValueWithRecordId<T>>, WindowedValue<T>>, Triggerable<FlinkKey, VoidNamespace> {
    private static final long MAX_RETENTION_SINCE_ACCESS = Duration.standardMinutes(10).getMillis();
    private final SerializablePipelineOptions options;
    private ValueStateDescriptor<Long> dedupingStateDescriptor = new ValueStateDescriptor<>("dedup-cache", LongSerializer.INSTANCE);
    private transient InternalTimerService<VoidNamespace> timerService;

    public DedupingOperator(PipelineOptions pipelineOptions) {
        this.options = new SerializablePipelineOptions(pipelineOptions);
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.timerService = getInternalTimerService("dedup-cleanup-timer", VoidNamespaceSerializer.INSTANCE, this);
    }

    public void open() {
        FileSystems.setDefaultPipelineOptions(this.options.get());
    }

    public void processElement(StreamRecord<WindowedValue<ValueWithRecordId<T>>> streamRecord) throws Exception {
        ValueState partitionedState = getPartitionedState(this.dedupingStateDescriptor);
        if (((Long) partitionedState.value()) == null) {
            WindowedValue windowedValue = (WindowedValue) streamRecord.getValue();
            this.output.collect(streamRecord.replace(windowedValue.withValue(((ValueWithRecordId) windowedValue.getValue()).getValue())));
        }
        long currentProcessingTime = this.timerService.currentProcessingTime();
        partitionedState.update(Long.valueOf(currentProcessingTime));
        this.timerService.registerProcessingTimeTimer(VoidNamespace.INSTANCE, currentProcessingTime + MAX_RETENTION_SINCE_ACCESS);
    }

    public void onEventTime(InternalTimer<FlinkKey, VoidNamespace> internalTimer) {
    }

    public void onProcessingTime(InternalTimer<FlinkKey, VoidNamespace> internalTimer) throws Exception {
        ValueState partitionedState = getPartitionedState(this.dedupingStateDescriptor);
        Long l = (Long) partitionedState.value();
        if (l == null || !l.equals(Long.valueOf(internalTimer.getTimestamp() - MAX_RETENTION_SINCE_ACCESS))) {
            return;
        }
        partitionedState.clear();
    }
}
