package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierAlignmentUtil;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler.class */
public class SingleCheckpointBarrierHandler extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SingleCheckpointBarrierHandler.class);
    private final String taskName;
    private final ControllerImpl context;
    private final BarrierAlignmentUtil.DelayableTimer registerTimer;
    private final SubtaskCheckpointCoordinator subTaskCheckpointCoordinator;
    private final CheckpointableInput[] inputs;
    private long currentCheckpointId;

    @Nullable
    private CheckpointBarrier pendingCheckpointBarrier;
    private final Set<InputChannelInfo> alignedChannels;
    private int targetChannelCount;
    private long lastCancelledOrCompletedCheckpointId;
    private int numOpenChannels;
    private CompletableFuture<Void> allBarriersReceivedFuture;
    private BarrierHandlerState currentState;
    private BarrierAlignmentUtil.Cancellable currentAlignmentTimer;
    private final boolean alternating;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/checkpointing/SingleCheckpointBarrierHandler$ControllerImpl.class */
    private final class ControllerImpl implements BarrierHandlerState.Controller {
        private ControllerImpl() {
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState.Controller
        public void triggerGlobalCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
            SingleCheckpointBarrierHandler.this.triggerCheckpoint(checkpointBarrier);
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState.Controller
        public boolean isTimedOut(CheckpointBarrier checkpointBarrier) {
            return checkpointBarrier.getCheckpointOptions().isTimeoutable() && checkpointBarrier.getId() <= SingleCheckpointBarrierHandler.this.currentCheckpointId && checkpointBarrier.getCheckpointOptions().getAlignedCheckpointTimeout() < SingleCheckpointBarrierHandler.this.getClock().absoluteTimeMillis() - checkpointBarrier.getTimestamp();
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState.Controller
        public boolean allBarriersReceived() {
            return SingleCheckpointBarrierHandler.this.alignedChannels.size() == SingleCheckpointBarrierHandler.this.targetChannelCount;
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState.Controller
        @Nullable
        public CheckpointBarrier getPendingCheckpointBarrier() {
            return SingleCheckpointBarrierHandler.this.pendingCheckpointBarrier;
        }

        @Override // org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState.Controller
        public void initInputsCheckpoint(CheckpointBarrier checkpointBarrier) throws CheckpointException {
            Preconditions.checkState(SingleCheckpointBarrierHandler.this.subTaskCheckpointCoordinator != null);
            SingleCheckpointBarrierHandler.this.subTaskCheckpointCoordinator.initInputsCheckpoint(checkpointBarrier.getId(), checkpointBarrier.getCheckpointOptions());
        }
    }

    @VisibleForTesting
    public static SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler(SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, String str, CheckpointableTask checkpointableTask, Clock clock, boolean z, CheckpointableInput... checkpointableInputArr) {
        return unaligned(str, checkpointableTask, subtaskCheckpointCoordinator, clock, (int) Arrays.stream(checkpointableInputArr).flatMap(checkpointableInput -> {
            return checkpointableInput.getChannelInfos().stream();
        }).count(), (callable, duration) -> {
            throw new IllegalStateException("Strictly unaligned checkpoints should never register any callbacks");
        }, z, checkpointableInputArr);
    }

    public static SingleCheckpointBarrierHandler unaligned(String str, CheckpointableTask checkpointableTask, SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, Clock clock, int i, BarrierAlignmentUtil.DelayableTimer delayableTimer, boolean z, CheckpointableInput... checkpointableInputArr) {
        return new SingleCheckpointBarrierHandler(str, checkpointableTask, subtaskCheckpointCoordinator, clock, i, new AlternatingWaitingForFirstBarrierUnaligned(false, new ChannelState(checkpointableInputArr)), false, delayableTimer, checkpointableInputArr, z);
    }

    public static SingleCheckpointBarrierHandler aligned(String str, CheckpointableTask checkpointableTask, Clock clock, int i, BarrierAlignmentUtil.DelayableTimer delayableTimer, boolean z, CheckpointableInput... checkpointableInputArr) {
        return new SingleCheckpointBarrierHandler(str, checkpointableTask, null, clock, i, new WaitingForFirstBarrier(checkpointableInputArr), false, delayableTimer, checkpointableInputArr, z);
    }

    public static SingleCheckpointBarrierHandler alternating(String str, CheckpointableTask checkpointableTask, SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, Clock clock, int i, BarrierAlignmentUtil.DelayableTimer delayableTimer, boolean z, CheckpointableInput... checkpointableInputArr) {
        return new SingleCheckpointBarrierHandler(str, checkpointableTask, subtaskCheckpointCoordinator, clock, i, new AlternatingWaitingForFirstBarrier(new ChannelState(checkpointableInputArr)), true, delayableTimer, checkpointableInputArr, z);
    }

    private SingleCheckpointBarrierHandler(String str, CheckpointableTask checkpointableTask, @Nullable SubtaskCheckpointCoordinator subtaskCheckpointCoordinator, Clock clock, int i, BarrierHandlerState barrierHandlerState, boolean z, BarrierAlignmentUtil.DelayableTimer delayableTimer, CheckpointableInput[] checkpointableInputArr, boolean z2) {
        super(checkpointableTask, clock, z2);
        this.currentCheckpointId = -1L;
        this.alignedChannels = new HashSet();
        this.lastCancelledOrCompletedCheckpointId = -1L;
        this.allBarriersReceivedFuture = new CompletableFuture<>();
        this.taskName = str;
        this.numOpenChannels = i;
        this.currentState = barrierHandlerState;
        this.alternating = z;
        this.registerTimer = delayableTimer;
        this.subTaskCheckpointCoordinator = subtaskCheckpointCoordinator;
        this.context = new ControllerImpl();
        this.inputs = checkpointableInputArr;
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processBarrier(CheckpointBarrier checkpointBarrier, InputChannelInfo inputChannelInfo, boolean z) throws IOException {
        long id = checkpointBarrier.getId();
        LOG.debug("{}: Received barrier from channel {} @ {}.", new Object[]{this.taskName, inputChannelInfo, Long.valueOf(id)});
        if (this.currentCheckpointId > id || (this.currentCheckpointId == id && !isCheckpointPending())) {
            if (checkpointBarrier.getCheckpointOptions().isUnalignedCheckpoint()) {
                return;
            }
            this.inputs[inputChannelInfo.getGateIdx()].resumeConsumption(inputChannelInfo);
        } else {
            checkNewCheckpoint(checkpointBarrier);
            Preconditions.checkState(this.currentCheckpointId == id);
            markCheckpointAlignedAndTransformState(inputChannelInfo, checkpointBarrier, barrierHandlerState -> {
                return barrierHandlerState.barrierReceived(this.context, inputChannelInfo, checkpointBarrier, !z);
            });
        }
    }

    protected void markCheckpointAlignedAndTransformState(InputChannelInfo inputChannelInfo, CheckpointBarrier checkpointBarrier, FunctionWithException<BarrierHandlerState, BarrierHandlerState, Exception> functionWithException) throws IOException {
        this.alignedChannels.add(inputChannelInfo);
        if (this.alignedChannels.size() == 1) {
            if (this.targetChannelCount == 1) {
                markAlignmentStartAndEnd(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
            } else {
                markAlignmentStart(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
            }
        }
        if (this.alignedChannels.size() == this.targetChannelCount && this.targetChannelCount > 1) {
            markAlignmentEnd();
        }
        try {
            this.currentState = (BarrierHandlerState) functionWithException.apply(this.currentState);
        } catch (CheckpointException e) {
            abortInternal(this.currentCheckpointId, e);
        } catch (Exception e2) {
            ExceptionUtils.rethrowIOException(e2);
        }
        if (this.alignedChannels.size() == this.targetChannelCount) {
            this.alignedChannels.clear();
            this.lastCancelledOrCompletedCheckpointId = this.currentCheckpointId;
            LOG.debug("{}: All the channels are aligned for checkpoint {}.", this.taskName, Long.valueOf(this.currentCheckpointId));
            resetAlignmentTimer();
            this.allBarriersReceivedFuture.complete(null);
        }
    }

    private void triggerCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
        LOG.debug("{}: Triggering checkpoint {} on the barrier announcement at {}.", new Object[]{this.taskName, Long.valueOf(checkpointBarrier.getId()), Long.valueOf(checkpointBarrier.getTimestamp())});
        notifyCheckpoint(checkpointBarrier);
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processBarrierAnnouncement(CheckpointBarrier checkpointBarrier, int i, InputChannelInfo inputChannelInfo) throws IOException {
        checkNewCheckpoint(checkpointBarrier);
        long id = checkpointBarrier.getId();
        if (this.currentCheckpointId > id || (this.currentCheckpointId == id && !isCheckpointPending())) {
            LOG.debug("{}: Obsolete announcement of checkpoint {} for channel {}.", new Object[]{this.taskName, Long.valueOf(id), inputChannelInfo});
        } else {
            this.currentState = this.currentState.announcementReceived(this.context, inputChannelInfo, i);
        }
    }

    private void registerAlignmentTimer(CheckpointBarrier checkpointBarrier) {
        this.currentAlignmentTimer = this.registerTimer.registerTask(() -> {
            long id = checkpointBarrier.getId();
            try {
                if (this.currentCheckpointId == id && !getAllBarriersReceivedFuture(id).isDone()) {
                    this.currentState = this.currentState.alignedCheckpointTimeout(this.context, checkpointBarrier);
                }
            } catch (CheckpointException e) {
                abortInternal(id, e);
            } catch (Exception e2) {
                ExceptionUtils.rethrowIOException(e2);
            }
            this.currentAlignmentTimer = null;
            return null;
        }, Duration.ofMillis(BarrierAlignmentUtil.getTimerDelay(getClock(), checkpointBarrier)));
    }

    private void checkNewCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
        long id = checkpointBarrier.getId();
        if (this.currentCheckpointId >= id) {
            return;
        }
        if (isCheckpointPending()) {
            cancelSubsumedCheckpoint(id);
        }
        this.currentCheckpointId = id;
        this.pendingCheckpointBarrier = checkpointBarrier;
        this.alignedChannels.clear();
        this.targetChannelCount = this.numOpenChannels;
        this.allBarriersReceivedFuture = new CompletableFuture<>();
        if (this.alternating && checkpointBarrier.getCheckpointOptions().isTimeoutable()) {
            registerAlignmentTimer(checkpointBarrier);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker, InputChannelInfo inputChannelInfo) throws IOException {
        long checkpointId = cancelCheckpointMarker.getCheckpointId();
        if (checkpointId > this.currentCheckpointId || (checkpointId == this.currentCheckpointId && this.alignedChannels.size() > 0)) {
            LOG.debug("{}: Received cancellation {}.", this.taskName, Long.valueOf(checkpointId));
            abortInternal(checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        }
    }

    private void abortInternal(long j, CheckpointFailureReason checkpointFailureReason) throws IOException {
        abortInternal(j, new CheckpointException(checkpointFailureReason));
    }

    private void abortInternal(long j, CheckpointException checkpointException) throws IOException {
        LOG.debug("{}: Aborting checkpoint {} after exception {}.", new Object[]{this.taskName, Long.valueOf(this.currentCheckpointId), checkpointException});
        this.currentCheckpointId = Math.max(j, this.currentCheckpointId);
        this.lastCancelledOrCompletedCheckpointId = Math.max(this.lastCancelledOrCompletedCheckpointId, j);
        this.pendingCheckpointBarrier = null;
        this.alignedChannels.clear();
        this.targetChannelCount = 0;
        resetAlignmentTimer();
        this.currentState = this.currentState.abort(j);
        if (j == this.currentCheckpointId) {
            resetAlignment();
        }
        notifyAbort(j, checkpointException);
        this.allBarriersReceivedFuture.completeExceptionally(checkpointException);
    }

    private void resetAlignmentTimer() {
        if (this.currentAlignmentTimer != null) {
            this.currentAlignmentTimer.cancel();
            this.currentAlignmentTimer = null;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public void processEndOfPartition(InputChannelInfo inputChannelInfo) throws IOException {
        this.numOpenChannels--;
        if (isCheckpointAfterTasksFinishedEnabled()) {
            if (isCheckpointPending()) {
                Preconditions.checkState(this.pendingCheckpointBarrier != null, "pending checkpoint barrier should not be null when there is pending checkpoint.");
                markCheckpointAlignedAndTransformState(inputChannelInfo, this.pendingCheckpointBarrier, barrierHandlerState -> {
                    return barrierHandlerState.endOfPartitionReceived(this.context, inputChannelInfo);
                });
                return;
            }
            return;
        }
        if (isCheckpointPending()) {
            LOG.warn("{}: Received EndOfPartition(-1) before completing current checkpoint {}. Skipping current checkpoint.", this.taskName, Long.valueOf(this.currentCheckpointId));
            abortInternal(this.currentCheckpointId, CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public long getLatestCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        resetAlignmentTimer();
        this.allBarriersReceivedFuture.cancel(false);
        super.close();
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    protected boolean isCheckpointPending() {
        return this.currentCheckpointId != this.lastCancelledOrCompletedCheckpointId && this.currentCheckpointId >= 0;
    }

    private void cancelSubsumedCheckpoint(long j) throws IOException {
        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", new Object[]{this.taskName, Long.valueOf(j), Long.valueOf(this.currentCheckpointId)});
        abortInternal(this.currentCheckpointId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
    }

    @Override // org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler
    public CompletableFuture<Void> getAllBarriersReceivedFuture(long j) {
        if (j < this.currentCheckpointId || this.numOpenChannels == 0) {
            return FutureUtils.completedVoidFuture();
        }
        if (j > this.currentCheckpointId) {
            throw new IllegalStateException("Checkpoint " + j + " has not been started at all");
        }
        return this.allBarriersReceivedFuture;
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.numOpenChannels;
    }

    public String toString() {
        return String.format("%s: current checkpoint: %d, current aligned channels: %d, target channel count: %d", this.taskName, Long.valueOf(this.currentCheckpointId), Integer.valueOf(this.alignedChannels.size()), Integer.valueOf(this.targetChannelCount));
    }
}
