package org.apache.flink.iteration.operator;

import java.util.Objects;
import java.util.function.Consumer;
import org.apache.flink.iteration.IterationID;
import org.apache.flink.iteration.IterationRecord;
import org.apache.flink.iteration.checkpoint.Checkpoints;
import org.apache.flink.iteration.checkpoint.CheckpointsBroker;
import org.apache.flink.statefun.flink.core.feedback.FeedbackChannel;
import org.apache.flink.statefun.flink.core.feedback.FeedbackChannelBroker;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/iteration/operator/TailOperator.class */
public class TailOperator extends AbstractStreamOperator<Void> implements OneInputStreamOperator<IterationRecord<?>, Void> {
    private final IterationID iterationId;
    private final int feedbackIndex;
    private transient Consumer<StreamRecord<IterationRecord<?>>> recordConsumer;
    private transient FeedbackChannel<StreamRecord<IterationRecord<?>>> channel;

    public TailOperator(IterationID iterationID, int i) {
        this.iterationId = (IterationID) Objects.requireNonNull(iterationID);
        this.feedbackIndex = i;
    }

    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Void>> output) {
        super.setup(streamTask, streamConfig, output);
    }

    public void open() throws Exception {
        super.open();
        this.channel = FeedbackChannelBroker.get().getChannel(OperatorUtils.createFeedbackKey(this.iterationId, this.feedbackIndex).withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber()));
        this.recordConsumer = getExecutionConfig().isObjectReuseEnabled() ? this::processIfObjectReuseEnabled : this::processIfObjectReuseNotEnabled;
    }

    public void processElement(StreamRecord<IterationRecord<?>> streamRecord) {
        this.recordConsumer.accept(streamRecord);
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        this.channel.put(new StreamRecord(IterationRecord.newBarrier(j)));
    }

    public void notifyCheckpointAborted(long j) throws Exception {
        super.notifyCheckpointAborted(j);
        Checkpoints checkpoints = CheckpointsBroker.get().getCheckpoints(OperatorUtils.createFeedbackKey(this.iterationId, this.feedbackIndex).withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getAttemptNumber()));
        if (checkpoints != null) {
            checkpoints.abort(j);
        }
    }

    private void processIfObjectReuseEnabled(StreamRecord<IterationRecord<?>> streamRecord) {
        IterationRecord m1clone = ((IterationRecord) streamRecord.getValue()).m1clone();
        m1clone.incrementEpoch();
        this.channel.put(new StreamRecord(m1clone, streamRecord.getTimestamp()));
    }

    private void processIfObjectReuseNotEnabled(StreamRecord<IterationRecord<?>> streamRecord) {
        ((IterationRecord) streamRecord.getValue()).incrementEpoch();
        this.channel.put(new StreamRecord(streamRecord.getValue(), streamRecord.getTimestamp()));
    }

    public void close() throws Exception {
        IOUtils.closeQuietly(this.channel);
        super.close();
    }
}
