/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io.checkpointing;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrier;
import org.apache.flink.streaming.runtime.io.checkpointing.AlternatingWaitingForFirstBarrierUnaligned;
import org.apache.flink.streaming.runtime.io.checkpointing.BarrierHandlerState;
import org.apache.flink.streaming.runtime.io.checkpointing.ChannelState;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.WaitingForFirstBarrier;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
@NotThreadSafe
public class SingleCheckpointBarrierHandler
extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger(SingleCheckpointBarrierHandler.class);
    private final String taskName;
    private final ControllerImpl context;
    private final BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> registerTimer;
    private final SubtaskCheckpointCoordinator subTaskCheckpointCoordinator;
    private final CheckpointableInput[] inputs;
    private int numBarriersReceived;
    private long currentCheckpointId = -1L;
    private long lastCancelledOrCompletedCheckpointId = -1L;
    private int numOpenChannels;
    private CompletableFuture<Void> allBarriersReceivedFuture = new CompletableFuture();
    private BarrierHandlerState currentState;
    private long firstBarrierArrivalTime;
    private CheckpointBarrierHandler.Cancellable currentAlignmentTimer;
    private final boolean alternating;

    @VisibleForTesting
    public static SingleCheckpointBarrierHandler createUnalignedCheckpointBarrierHandler(SubtaskCheckpointCoordinator checkpointCoordinator, String taskName, AbstractInvokable toNotifyOnCheckpoint, Clock clock, CheckpointableInput ... inputs) {
        return SingleCheckpointBarrierHandler.unaligned(taskName, toNotifyOnCheckpoint, checkpointCoordinator, clock, (int)Arrays.stream(inputs).flatMap(gate -> gate.getChannelInfos().stream()).count(), (callable, duration) -> {
            throw new IllegalStateException("Strictly unaligned checkpoints should never register any callbacks");
        }, inputs);
    }

    public static SingleCheckpointBarrierHandler unaligned(String taskName, AbstractInvokable toNotifyOnCheckpoint, SubtaskCheckpointCoordinator checkpointCoordinator, Clock clock, int numOpenChannels, BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> registerTimer, CheckpointableInput ... inputs) {
        return new SingleCheckpointBarrierHandler(taskName, toNotifyOnCheckpoint, checkpointCoordinator, clock, numOpenChannels, new AlternatingWaitingForFirstBarrierUnaligned(false, new ChannelState(inputs)), false, registerTimer, inputs);
    }

    public static SingleCheckpointBarrierHandler aligned(String taskName, AbstractInvokable toNotifyOnCheckpoint, Clock clock, int numOpenChannels, BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> registerTimer, CheckpointableInput ... inputs) {
        return new SingleCheckpointBarrierHandler(taskName, toNotifyOnCheckpoint, null, clock, numOpenChannels, new WaitingForFirstBarrier(inputs), false, registerTimer, inputs);
    }

    public static SingleCheckpointBarrierHandler alternating(String taskName, AbstractInvokable toNotifyOnCheckpoint, SubtaskCheckpointCoordinator checkpointCoordinator, Clock clock, int numOpenChannels, BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> registerTimer, CheckpointableInput ... inputs) {
        return new SingleCheckpointBarrierHandler(taskName, toNotifyOnCheckpoint, checkpointCoordinator, clock, numOpenChannels, new AlternatingWaitingForFirstBarrier(new ChannelState(inputs)), true, registerTimer, inputs);
    }

    private SingleCheckpointBarrierHandler(String taskName, AbstractInvokable toNotifyOnCheckpoint, @Nullable SubtaskCheckpointCoordinator subTaskCheckpointCoordinator, Clock clock, int numOpenChannels, BarrierHandlerState currentState, boolean alternating, BiFunction<Callable<?>, Duration, CheckpointBarrierHandler.Cancellable> registerTimer, CheckpointableInput[] inputs) {
        super(toNotifyOnCheckpoint, clock);
        this.taskName = taskName;
        this.numOpenChannels = numOpenChannels;
        this.currentState = currentState;
        this.alternating = alternating;
        this.registerTimer = registerTimer;
        this.subTaskCheckpointCoordinator = subTaskCheckpointCoordinator;
        this.context = new ControllerImpl();
        this.inputs = inputs;
    }

    @Override
    public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelInfo) throws IOException {
        long barrierId = barrier.getId();
        LOG.debug("{}: Received barrier from channel {} @ {}.", new Object[]{this.taskName, channelInfo, barrierId});
        if (this.currentCheckpointId > barrierId || this.currentCheckpointId == barrierId && !this.isCheckpointPending()) {
            if (!barrier.getCheckpointOptions().isUnalignedCheckpoint()) {
                this.inputs[channelInfo.getGateIdx()].resumeConsumption(channelInfo);
            }
            return;
        }
        this.checkNewCheckpoint(barrier);
        Preconditions.checkState((this.currentCheckpointId == barrierId ? 1 : 0) != 0);
        if (this.numBarriersReceived++ == 0) {
            if (this.getNumOpenChannels() == 1) {
                this.markAlignmentStartAndEnd(barrierId, barrier.getTimestamp());
            } else {
                this.markAlignmentStart(barrierId, barrier.getTimestamp());
            }
        }
        if (this.numBarriersReceived == this.numOpenChannels && this.getNumOpenChannels() > 1) {
            this.markAlignmentEnd();
        }
        try {
            this.currentState = this.currentState.barrierReceived(this.context, channelInfo, barrier);
        }
        catch (CheckpointException e) {
            this.abortInternal(barrier.getId(), e);
        }
        catch (Exception e) {
            ExceptionUtils.rethrowIOException((Throwable)e);
        }
        if (this.numBarriersReceived == this.numOpenChannels) {
            this.numBarriersReceived = 0;
            this.lastCancelledOrCompletedCheckpointId = this.currentCheckpointId;
            LOG.debug("{}: Received all barriers for checkpoint {}.", (Object)this.taskName, (Object)this.currentCheckpointId);
            this.resetAlignmentTimer();
            this.allBarriersReceivedFuture.complete(null);
        }
    }

    private void triggerCheckpoint(CheckpointBarrier trigger) throws IOException {
        LOG.debug("{}: Triggering checkpoint {} on the barrier announcement at {}.", new Object[]{this.taskName, trigger.getId(), trigger.getTimestamp()});
        this.notifyCheckpoint(trigger);
    }

    @Override
    public void processBarrierAnnouncement(CheckpointBarrier announcedBarrier, int sequenceNumber, InputChannelInfo channelInfo) throws IOException {
        this.checkNewCheckpoint(announcedBarrier);
        long barrierId = announcedBarrier.getId();
        if (this.currentCheckpointId > barrierId || this.currentCheckpointId == barrierId && !this.isCheckpointPending()) {
            LOG.debug("{}: Obsolete announcement of checkpoint {} for channel {}.", new Object[]{this.taskName, barrierId, channelInfo});
            return;
        }
        this.currentState = this.currentState.announcementReceived(this.context, channelInfo, sequenceNumber);
    }

    private void registerAlignmentTimer(CheckpointBarrier announcedBarrier) {
        this.currentAlignmentTimer = this.registerTimer.apply(() -> {
            long barrierId = announcedBarrier.getId();
            try {
                if (this.currentCheckpointId == barrierId && !this.getAllBarriersReceivedFuture(barrierId).isDone()) {
                    this.currentState = this.currentState.alignmentTimeout(this.context, announcedBarrier);
                }
            }
            catch (CheckpointException ex) {
                this.abortInternal(barrierId, ex);
            }
            catch (Exception e) {
                ExceptionUtils.rethrowIOException((Throwable)e);
            }
            this.currentAlignmentTimer = null;
            return null;
        }, Duration.ofMillis(announcedBarrier.getCheckpointOptions().getAlignmentTimeout()));
    }

    private void checkNewCheckpoint(CheckpointBarrier barrier) throws IOException {
        long barrierId = barrier.getId();
        if (this.currentCheckpointId >= barrierId) {
            return;
        }
        if (this.isCheckpointPending()) {
            this.cancelSubsumedCheckpoint(barrierId);
        }
        this.currentCheckpointId = barrierId;
        this.numBarriersReceived = 0;
        this.allBarriersReceivedFuture = new CompletableFuture();
        this.firstBarrierArrivalTime = this.getClock().relativeTimeNanos();
        if (this.alternating && barrier.getCheckpointOptions().isTimeoutable()) {
            this.registerAlignmentTimer(barrier);
        }
    }

    @Override
    public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException {
        long cancelledId = cancelBarrier.getCheckpointId();
        if (cancelledId > this.currentCheckpointId || cancelledId == this.currentCheckpointId && this.numBarriersReceived > 0) {
            LOG.debug("{}: Received cancellation {}.", (Object)this.taskName, (Object)cancelledId);
            this.abortInternal(cancelledId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
        }
    }

    private void abortInternal(long cancelledId, CheckpointFailureReason reason) throws IOException {
        this.abortInternal(cancelledId, new CheckpointException(reason));
    }

    private void abortInternal(long cancelledId, CheckpointException exception) throws IOException {
        LOG.debug("{}: Aborting checkpoint {} after exception {}.", new Object[]{this.taskName, this.currentCheckpointId, exception});
        this.currentCheckpointId = Math.max(cancelledId, this.currentCheckpointId);
        this.lastCancelledOrCompletedCheckpointId = Math.max(this.lastCancelledOrCompletedCheckpointId, cancelledId);
        this.numBarriersReceived = 0;
        this.resetAlignmentTimer();
        this.currentState = this.currentState.abort(cancelledId);
        if (cancelledId == this.currentCheckpointId) {
            this.resetAlignment();
        }
        this.notifyAbort(cancelledId, exception);
        this.allBarriersReceivedFuture.completeExceptionally(exception);
    }

    private void resetAlignmentTimer() {
        if (this.currentAlignmentTimer != null) {
            this.currentAlignmentTimer.cancel();
            this.currentAlignmentTimer = null;
        }
    }

    @Override
    public void processEndOfPartition() throws IOException {
        --this.numOpenChannels;
        if (this.isCheckpointPending()) {
            LOG.warn("{}: Received EndOfPartition(-1) before completing current checkpoint {}. Skipping current checkpoint.", (Object)this.taskName, (Object)this.currentCheckpointId);
            this.abortInternal(this.currentCheckpointId, CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM);
        }
    }

    @Override
    public long getLatestCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override
    public void close() throws IOException {
        this.resetAlignmentTimer();
        this.allBarriersReceivedFuture.cancel(false);
        super.close();
    }

    @Override
    protected boolean isCheckpointPending() {
        return this.currentCheckpointId != this.lastCancelledOrCompletedCheckpointId && this.currentCheckpointId >= 0L;
    }

    private void cancelSubsumedCheckpoint(long barrierId) throws IOException {
        LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", new Object[]{this.taskName, barrierId, this.currentCheckpointId});
        this.abortInternal(this.currentCheckpointId, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED);
    }

    @Override
    public CompletableFuture<Void> getAllBarriersReceivedFuture(long checkpointId) {
        if (checkpointId < this.currentCheckpointId) {
            return FutureUtils.completedVoidFuture();
        }
        if (checkpointId > this.currentCheckpointId) {
            throw new IllegalStateException("Checkpoint " + checkpointId + " has not been started at all");
        }
        return this.allBarriersReceivedFuture;
    }

    @VisibleForTesting
    int getNumOpenChannels() {
        return this.numOpenChannels;
    }

    public String toString() {
        return String.format("%s: current checkpoint: %d, current barriers: %d, open channels: %d", this.taskName, this.currentCheckpointId, this.numBarriersReceived, this.numOpenChannels);
    }

    private final class ControllerImpl
    implements BarrierHandlerState.Controller {
        private ControllerImpl() {
        }

        @Override
        public void triggerGlobalCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException {
            SingleCheckpointBarrierHandler.this.triggerCheckpoint(checkpointBarrier);
        }

        @Override
        public boolean isTimedOut(CheckpointBarrier barrier) {
            return barrier.getCheckpointOptions().isTimeoutable() && barrier.getId() <= SingleCheckpointBarrierHandler.this.currentCheckpointId && barrier.getCheckpointOptions().getAlignmentTimeout() * 1000000L < SingleCheckpointBarrierHandler.this.getClock().relativeTimeNanos() - SingleCheckpointBarrierHandler.this.firstBarrierArrivalTime;
        }

        @Override
        public boolean allBarriersReceived() {
            return SingleCheckpointBarrierHandler.this.numBarriersReceived == SingleCheckpointBarrierHandler.this.numOpenChannels;
        }

        @Override
        public void initInputsCheckpoint(CheckpointBarrier checkpointBarrier) throws CheckpointException {
            Preconditions.checkState((SingleCheckpointBarrierHandler.this.subTaskCheckpointCoordinator != null ? 1 : 0) != 0);
            long barrierId = checkpointBarrier.getId();
            SingleCheckpointBarrierHandler.this.subTaskCheckpointCoordinator.initInputsCheckpoint(barrierId, checkpointBarrier.getCheckpointOptions());
        }
    }
}

